# Customizing Pipelines and Data Workflows: Advanced Models and Efficient Processing

This notebook contains all examples from Chapter 8 with step-by-step explanations.

## Table of Contents
1. [Environment Setup](#environment-setup)
2. [Pipeline Basics and Customization](#pipeline-basics)
3. [Efficient Data Handling](#data-handling)
4. [Optimization Techniques](#optimization)
5. [Synthetic Data Generation](#synthetic-data)
6. [Production Workflows](#production)

---

## Environment Setup <a id='environment-setup'></a>

First, let's set up our environment with all necessary imports and configurations.

In [None]:
# Core imports
import os
import sys
import time
import torch
import numpy as np
from pathlib import Path
from typing import List, Dict, Any
from contextlib import contextmanager

# Add src to path for local imports
sys.path.append('../src')

# Transformers imports
from transformers import (
    pipeline,
    Pipeline,
    AutoModel,
    AutoTokenizer,
    AutoModelForSequenceClassification,
    AutoModelForCausalLM
)
from transformers.pipelines import register_pipeline
from transformers.utils import logging

# Datasets
from datasets import load_dataset, Dataset

# For optimization examples
from peft import LoraConfig, get_peft_model, TaskType

# For synthetic data generation
from diffusers import DiffusionPipeline

print("Environment ready!")
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"Device: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU'}")

### Device Configuration

Let's implement the cross-platform device detection from the chapter.

In [None]:
def get_optimal_device() -> torch.device:
    """Automatically detect best available device."""
    if torch.cuda.is_available():
        return torch.device("cuda")
    elif torch.backends.mps.is_available():  # Apple Silicon
        return torch.device("mps")
    else:
        return torch.device("cpu")

DEVICE = get_optimal_device()
print(f"Using device: {DEVICE}")

## Pipeline Basics and Customization <a id='pipeline-basics'></a>

### Quick Start: Modern Pipeline Usage

Let's start with the basic pipeline example from the chapter.

In [None]:
# Modern quick-start with explicit model and device
clf = pipeline(
    'sentiment-analysis',
    model='distilbert-base-uncased-finetuned-sst-2-english',
    device=0 if DEVICE.type == "cuda" else -1  # 0 for CUDA GPU, -1 for CPU
)

# Run prediction on text
result = clf('I love Hugging Face!')
print(result)
# Expected output: [{'label': 'POSITIVE', 'score': 0.9998}]

# Let's try multiple examples
texts = [
    "I love this product!",
    "This is terrible.",
    "Not bad, but could be better."
]
results = clf(texts)
for text, result in zip(texts, results):
    print(f"Text: '{text}'")
    print(f"  Sentiment: {result['label']} (confidence: {result['score']:.3f})\n")

### Custom Preprocessing

Now let's add custom preprocessing to normalize text before inference.

In [None]:
def custom_preprocess(text):
    """Normalize text for consistent predictions."""
    import string
    text = text.lower()
    return text.translate(str.maketrans('', '', string.punctuation))

# Test preprocessing
texts = ["Wow! Amazing product!!!", "I don't like this..."]
print("Original texts:", texts)

# Clean then predict
cleaned = [custom_preprocess(t) for t in texts]
print("Cleaned texts:", cleaned)

# Batch processing for speed
results = clf(cleaned, batch_size=16)
print("\nResults after preprocessing:")
for original, clean, result in zip(texts, cleaned, results):
    print(f"Original: '{original}'")
    print(f"Cleaned: '{clean}'")
    print(f"Result: {result}\n")

### Advanced: Pipeline Subclassing

Create a reusable pipeline with built-in preprocessing and postprocessing.

In [None]:
class CustomSentimentPipeline(Pipeline):
    def preprocess(self, inputs):
        """Strip HTML, normalize text."""
        if isinstance(inputs, str):
            text = inputs.lower()
            import string
            text = text.translate(str.maketrans('', '', string.punctuation))
            return super().preprocess(text)
        return super().preprocess(inputs)
    
    def postprocess(self, outputs):
        """Add confidence thresholds."""
        results = super().postprocess(outputs)
        for r in results:
            r['confident'] = r['score'] > 0.95
        return results

# Note: In practice, you would register and use this custom pipeline
# For now, let's demonstrate the concept with the standard pipeline
print("Custom pipeline concept demonstrated!")

### Inspecting Pipeline Components

Let's peek under the hood to understand pipeline anatomy.

In [None]:
# Inspect pipeline components
print('Model:', type(clf.model).__name__)
print('Tokenizer:', type(clf.tokenizer).__name__)  
print('Processor:', getattr(clf, 'processor', None))
print('Framework:', clf.framework)
print('Device:', clf.device)

# Let's look at model details
print(f"\nModel architecture: {clf.model.config.model_type}")
print(f"Hidden size: {clf.model.config.hidden_size}")
print(f"Number of labels: {clf.model.config.num_labels}")

# Tokenizer details
print(f"\nTokenizer vocab size: {clf.tokenizer.vocab_size}")
print(f"Max length: {clf.tokenizer.model_max_length}")

### Composing Multiple Pipelines

Let's create a combined sentiment + NER pipeline.

In [None]:
# Load individual pipelines
sentiment_pipe = pipeline('sentiment-analysis')
ner_pipe = pipeline('ner')

def combined_analysis(text):
    """Combine sentiment and NER analysis."""
    sentiment = sentiment_pipe(text)
    entities = ner_pipe(text)
    
    return {
        "text": text,
        "sentiment": sentiment[0],
        "entities": entities
    }

# Test combined analysis
test_text = "Apple Inc. makes amazing products! I love my iPhone."
result = combined_analysis(test_text)

print(f"Text: {result['text']}")
print(f"\nSentiment: {result['sentiment']['label']} ({result['sentiment']['score']:.3f})")
print("\nEntities found:")
for entity in result['entities']:
    print(f"  - {entity['word']}: {entity['entity_group'] if 'entity_group' in entity else entity['entity']}")

### Debugging Pipelines

Enable verbose logging to debug pipeline issues.

In [None]:
# Enable debug logging
logging.set_verbosity_debug()

# Now pipeline operations will show detailed logs
print("Debug mode enabled. Running pipeline...")
result = clf("Debug me!")
print(f"\nResult: {result}")

# Reset logging to normal
logging.set_verbosity_warning()
print("\nLogging reset to normal level.")

## Efficient Data Handling with 🤗 Datasets <a id='data-handling'></a>

### Loading and Transforming Datasets

In [None]:
# Load a small dataset for demonstration
dataset = load_dataset('imdb', split='train[:1000]')  # Load only first 1000 examples
print(f"Dataset size: {len(dataset)}")
print(f"First example: {dataset[0]}")
print(f"\nFeatures: {dataset.features}")

In [None]:
# Define preprocessing function
def preprocess_batch(batch):
    """Process entire batches at once."""
    batch['text'] = [text.lower() for text in batch['text']]
    batch['length'] = [len(text.split()) for text in batch['text']]
    return batch

# Transform with parallel processing
print("Transforming dataset...")
start_time = time.time()
dataset = dataset.map(preprocess_batch, batched=True, num_proc=4)
print(f"Transformation completed in {time.time() - start_time:.2f} seconds")

# Filter short reviews
print(f"\nDataset before filtering: {len(dataset)} examples")
dataset = dataset.filter(lambda x: x['length'] > 20)
print(f"Dataset after filtering: {len(dataset)} examples")

# Check the new features
print(f"\nUpdated features: {dataset.features}")
print(f"Example with new features: {dataset[0]}")

### Streaming Large Datasets

For massive datasets, use streaming to avoid memory issues.

In [None]:
# Create a sample CSV for demonstration
import csv

sample_data = [
    {"text": "This product is amazing!", "label": "positive"},
    {"text": "Terrible experience.", "label": "negative"},
    {"text": "Good value for money.", "label": "positive"},
    {"text": "Not worth the price.", "label": "negative"},
    {"text": "Excellent quality!", "label": "positive"}
] * 20  # Repeat for larger dataset

csv_path = "sample_reviews.csv"
with open(csv_path, 'w', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=['text', 'label'])
    writer.writeheader()
    writer.writerows(sample_data)

# Stream the dataset
streaming_dataset = load_dataset('csv', data_files=csv_path, split='train', streaming=True)

# Process in batches
batch_size = 32
batch = []
processed_count = 0

print("Processing streaming dataset...")
for example in streaming_dataset:
    batch.append(custom_preprocess(example['text']))
    
    if len(batch) == batch_size:
        # Process batch
        results = clf(batch, batch_size=batch_size)
        processed_count += len(batch)
        print(f"Processed {processed_count} examples...")
        batch = []
    
    # Stop after processing 100 examples for demo
    if processed_count >= 96:
        break

# Process remaining batch
if batch:
    results = clf(batch)
    processed_count += len(batch)

print(f"\nTotal processed: {processed_count} examples")

# Clean up
os.remove(csv_path)

### Creating Custom Datasets

In [None]:
# Create a custom dataset from dictionaries
custom_data = {
    "text": [
        "The future of AI is bright and full of possibilities.",
        "Machine learning transforms how we solve complex problems.",
        "Deep learning models continue to improve rapidly.",
        "Natural language processing enables better human-computer interaction.",
        "Computer vision applications are becoming more sophisticated."
    ],
    "category": ["future", "ml", "dl", "nlp", "cv"]
}

custom_dataset = Dataset.from_dict(custom_data)
print(f"Custom dataset: {custom_dataset}")
print(f"\nFirst example: {custom_dataset[0]}")

# Apply transformations
def add_metadata(example):
    example['word_count'] = len(example['text'].split())
    example['char_count'] = len(example['text'])
    return example

custom_dataset = custom_dataset.map(add_metadata)
print(f"\nAfter transformation: {custom_dataset[0]}")

## Optimization Techniques <a id='optimization'></a>

### Batching for 10x Throughput

In [None]:
# Prepare test data
test_texts = [
    "Review 1: This product exceeded my expectations.",
    "Review 2: Not satisfied with the quality.",
    "Review 3: Average product, nothing special.",
    "Review 4: Absolutely love it!",
    "Review 5: Waste of money.",
    "Review 6: Good value for the price.",
    "Review 7: Would recommend to friends.",
    "Review 8: Poor customer service."
] * 4  # Repeat for more examples

# Method 1: One by one (slow)
print("Method 1: Processing one by one...")
start_time = time.time()
results_single = []
for text in test_texts[:8]:  # Process only first 8 for demo
    result = clf(text)
    results_single.append(result)
single_time = time.time() - start_time
print(f"Time taken: {single_time:.3f} seconds")
print(f"Average per text: {single_time/8*1000:.1f} ms")

# Method 2: Batch processing (fast)
print("\nMethod 2: Batch processing...")
start_time = time.time()
results_batch = clf(test_texts, 
                   padding=True,
                   truncation=True,
                   max_length=128)
batch_time = time.time() - start_time
print(f"Time taken: {batch_time:.3f} seconds")
print(f"Average per text: {batch_time/len(test_texts)*1000:.1f} ms")

# Calculate speedup
speedup = (single_time/8*len(test_texts)) / batch_time
print(f"\nSpeedup: {speedup:.1f}x faster with batching!")

### Modern Quantization

Demonstrate quantization for cost reduction (requires appropriate hardware).

In [None]:
# Load a small model for demonstration
model_name = "distilbert-base-uncased"

print("Loading standard model...")
model = AutoModelForSequenceClassification.from_pretrained(model_name)

# Calculate model size
param_size = sum(p.numel() * p.element_size() for p in model.parameters())
buffer_size = sum(b.numel() * b.element_size() for b in model.buffers())
model_size = param_size + buffer_size
print(f"Standard model size: {model_size / 1024 / 1024:.1f} MB")

# Count parameters
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params:,}")

# Note: Actual quantization requires bitsandbytes library and compatible GPU
# This is a conceptual demonstration
print("\nQuantization options:")
print("- INT8: ~75% size reduction, minimal accuracy loss")
print("- INT4: ~87.5% size reduction, may require fine-tuning")
print("- Dynamic quantization: Adapts to input ranges")

### Memory Tracking Utility

Implement the memory tracking context manager from the chapter.

In [None]:
@contextmanager
def track_memory(device: str = "cuda"):
    """Context manager for GPU memory profiling."""
    if device == "cuda" and torch.cuda.is_available():
        torch.cuda.synchronize()
        start_memory = torch.cuda.memory_allocated()
        yield
        torch.cuda.synchronize()
        end_memory = torch.cuda.memory_allocated()
        memory_used = end_memory - start_memory
        print(f"Memory used: {memory_used / 1024 / 1024:.2f} MB")
    else:
        yield
        print("Memory tracking only available for CUDA devices")

# Example usage
print("Testing memory tracking...")
with track_memory(device=DEVICE.type):
    # Run some inference
    _ = clf("Test text for memory tracking")

### PEFT/LoRA Concept

Demonstrate Parameter-Efficient Fine-Tuning concepts.

In [None]:
# PEFT configuration example (conceptual)
from peft import LoraConfig, TaskType

# Define LoRA configuration
peft_config = LoraConfig(
    task_type=TaskType.SEQ_CLS,  # Sequence classification
    r=16,  # LoRA rank
    lora_alpha=32,
    lora_dropout=0.1,
    target_modules=["query", "value"]  # Target attention layers
)

print("LoRA Configuration:")
print(f"  Rank (r): {peft_config.r}")
print(f"  Alpha: {peft_config.lora_alpha}")
print(f"  Dropout: {peft_config.lora_dropout}")
print(f"  Target modules: {peft_config.target_modules}")

# Calculate approximate trainable parameters
# For BERT-base with r=16:
# Original: ~110M parameters
# LoRA trainable: ~0.3M parameters (0.3% of original)
print("\nParameter efficiency:")
print("  Original BERT-base: ~110M parameters")
print("  LoRA trainable: ~0.3M parameters")
print("  Reduction: 99.7% fewer trainable parameters!")

## Synthetic Data Generation <a id='synthetic-data'></a>

### Text Generation with LLMs

In [None]:
# Load a small text generation model
print("Loading text generation model...")
gen = pipeline(
    'text-generation',
    model='gpt2',  # Using smaller model for demo
    device=0 if DEVICE.type == "cuda" else -1
)

# Generate product reviews
prompts = [
    "This smartphone is",
    "The laptop performance is",
    "Customer service was"
]

print("Generating synthetic reviews...\n")
for prompt in prompts:
    generated = gen(
        prompt,
        max_new_tokens=30,
        num_return_sequences=2,
        temperature=0.8,
        pad_token_id=gen.tokenizer.eos_token_id
    )
    
    print(f"Prompt: '{prompt}'")
    for i, g in enumerate(generated):
        print(f"  Generated {i+1}: {g['generated_text']}")
    print()

### Synthetic Data Validation

Implement quality checks for synthetic data.

In [None]:
def validate_synthetic_text(texts: List[str]) -> Dict[str, Any]:
    """Basic validation for synthetic text data."""
    results = {
        "total": len(texts),
        "valid": 0,
        "issues": []
    }
    
    for text in texts:
        issues = []
        
        # Check length
        if len(text.split()) < 5:
            issues.append("too_short")
        elif len(text.split()) > 200:
            issues.append("too_long")
        
        # Check for repetition
        words = text.lower().split()
        if len(words) > 0 and len(set(words)) / len(words) < 0.5:
            issues.append("repetitive")
        
        # Check for truncation
        if text.endswith("...") or not text.endswith(('.', '!', '?')):
            issues.append("truncated")
        
        if not issues:
            results["valid"] += 1
        else:
            results["issues"].extend(issues)
    
    results["validity_rate"] = results["valid"] / results["total"]
    return results

# Test validation
synthetic_samples = [
    "This product is amazing and works perfectly!",
    "Good good good good good.",
    "The laptop",
    "Excellent quality and fast shipping. Would buy again.",
    "This is a test that ends abruptly and"
]

validation_results = validate_synthetic_text(synthetic_samples)
print("Validation Results:")
print(f"  Total samples: {validation_results['total']}")
print(f"  Valid samples: {validation_results['valid']}")
print(f"  Validity rate: {validation_results['validity_rate']:.1%}")
print(f"  Issues found: {set(validation_results['issues'])}")

## Production Workflows <a id='production'></a>

### Complete Production Pipeline Example

Let's implement a simplified version of the RetailReviewWorkflow from the chapter.

In [None]:
class SimpleRetailWorkflow:
    """Simplified production workflow for retail review analysis."""
    
    def __init__(self):
        # Initialize pipelines
        self.sentiment_pipeline = pipeline(
            'sentiment-analysis',
            model='distilbert-base-uncased-finetuned-sst-2-english'
        )
        
        # Priority keywords for urgency detection
        self.priority_keywords = {
            "urgent": ["broken", "damaged", "fraud", "stolen", "urgent"],
            "high": ["terrible", "awful", "worst", "refund", "complaint"],
            "medium": ["disappointed", "issue", "problem", "concern"],
            "low": ["suggestion", "feedback", "minor"]
        }
    
    def analyze_priority(self, text: str) -> str:
        """Determine review priority based on keywords."""
        text_lower = text.lower()
        
        for priority, keywords in self.priority_keywords.items():
            if any(keyword in text_lower for keyword in keywords):
                return priority
        
        return "normal"
    
    def process_review(self, review: str) -> Dict[str, Any]:
        """Process a single review."""
        # Sentiment analysis
        sentiment = self.sentiment_pipeline(review)[0]
        
        # Priority detection
        priority = self.analyze_priority(review)
        
        return {
            "text": review,
            "sentiment": sentiment["label"],
            "sentiment_score": sentiment["score"],
            "priority": priority,
            "needs_attention": priority in ["urgent", "high"]
        }
    
    def process_batch(self, reviews: List[str]) -> Dict[str, Any]:
        """Process multiple reviews and generate insights."""
        results = [self.process_review(review) for review in reviews]
        
        # Generate insights
        total = len(results)
        urgent_count = sum(1 for r in results if r["priority"] in ["urgent", "high"])
        positive_count = sum(1 for r in results if r["sentiment"] == "POSITIVE")
        
        insights = {
            "total_reviews": total,
            "urgent_reviews": urgent_count,
            "sentiment_distribution": {
                "positive": positive_count,
                "negative": total - positive_count,
                "positive_rate": positive_count / total if total > 0 else 0
            },
            "results": results
        }
        
        return insights

# Test the workflow
workflow = SimpleRetailWorkflow()

sample_reviews = [
    "This product is absolutely amazing! Fast shipping and great quality.",
    "Terrible experience. The item arrived broken and customer service was unhelpful.",
    "Good value for money, but packaging could be better.",
    "URGENT: Received wrong item. Need immediate refund!",
    "The product works as described. Delivery was on time."
]

# Process reviews
print("Processing reviews...\n")
start_time = time.time()
insights = workflow.process_batch(sample_reviews)
process_time = time.time() - start_time

# Display results
print("=== WORKFLOW RESULTS ===")
print(f"Total reviews processed: {insights['total_reviews']}")
print(f"Processing time: {process_time:.3f} seconds")
print(f"\nUrgent reviews requiring attention: {insights['urgent_reviews']}")
print(f"Positive sentiment rate: {insights['sentiment_distribution']['positive_rate']:.1%}")

print("\n=== DETAILED RESULTS ===")
for i, result in enumerate(insights['results']):
    if result['needs_attention']:
        print(f"\n⚠️  Review {i+1} (NEEDS ATTENTION):")
    else:
        print(f"\nReview {i+1}:")
    print(f"  Text: '{result['text'][:50]}...'")
    print(f"  Sentiment: {result['sentiment']} ({result['sentiment_score']:.3f})")
    print(f"  Priority: {result['priority'].upper()}")

### Configuration Management

Implement a configuration system with environment variable support.

In [None]:
class Config:
    """Centralized configuration with environment fallbacks."""
    
    # Device configuration with automatic detection
    DEVICE = get_optimal_device()
    
    # Model configurations with env overrides
    DEFAULT_SENTIMENT_MODEL = os.getenv(
        "SENTIMENT_MODEL", 
        "distilbert-base-uncased-finetuned-sst-2-english"
    )
    
    # Performance settings
    BATCH_SIZE = int(os.getenv("BATCH_SIZE", "32"))
    MAX_LENGTH = int(os.getenv("MAX_LENGTH", "512"))
    ENABLE_FLASH_ATTENTION = os.getenv("ENABLE_FLASH_ATTENTION", "true").lower() == "true"
    
    # Directory management with auto-creation
    DATA_PATH = Path(os.getenv("DATA_PATH", "./data"))
    CACHE_DIR = Path(os.getenv("CACHE_DIR", "./cache"))
    
    @classmethod
    def display(cls):
        """Display current configuration."""
        print("Current Configuration:")
        print(f"  Device: {cls.DEVICE}")
        print(f"  Default Model: {cls.DEFAULT_SENTIMENT_MODEL}")
        print(f"  Batch Size: {cls.BATCH_SIZE}")
        print(f"  Max Length: {cls.MAX_LENGTH}")
        print(f"  Flash Attention: {cls.ENABLE_FLASH_ATTENTION}")
        print(f"  Data Path: {cls.DATA_PATH}")
        print(f"  Cache Dir: {cls.CACHE_DIR}")

# Display configuration
Config.display()

### Performance Benchmarking

Create a simple benchmarking utility.

In [None]:
def benchmark_pipeline(pipeline_func, inputs: List[str], name: str = "Pipeline") -> Dict[str, float]:
    """Benchmark a pipeline with various metrics."""
    print(f"\nBenchmarking {name}...")
    
    # Warmup
    _ = pipeline_func(inputs[0])
    
    # Single inference
    start = time.time()
    for inp in inputs[:10]:
        _ = pipeline_func(inp)
    single_time = time.time() - start
    
    # Batch inference
    start = time.time()
    _ = pipeline_func(inputs)
    batch_time = time.time() - start
    
    metrics = {
        "single_latency_ms": (single_time / 10) * 1000,
        "batch_latency_ms": (batch_time / len(inputs)) * 1000,
        "throughput_single": 10 / single_time,
        "throughput_batch": len(inputs) / batch_time,
        "speedup": (single_time / 10 * len(inputs)) / batch_time
    }
    
    print(f"  Single inference: {metrics['single_latency_ms']:.1f} ms/sample")
    print(f"  Batch inference: {metrics['batch_latency_ms']:.1f} ms/sample")
    print(f"  Throughput (single): {metrics['throughput_single']:.1f} samples/sec")
    print(f"  Throughput (batch): {metrics['throughput_batch']:.1f} samples/sec")
    print(f"  Batch speedup: {metrics['speedup']:.1f}x")
    
    return metrics

# Benchmark our sentiment pipeline
test_inputs = [f"Test review number {i}. This is a sample text." for i in range(50)]
metrics = benchmark_pipeline(clf, test_inputs, "Sentiment Analysis")

## Summary and Key Takeaways

In this tutorial notebook, we've covered:

1. **Pipeline Customization**: From basic usage to custom preprocessing and component composition
2. **Efficient Data Handling**: Using 🤗 Datasets for scalable data processing
3. **Optimization Techniques**: Batching, quantization, and memory tracking
4. **Synthetic Data Generation**: Creating and validating synthetic training data
5. **Production Workflows**: Building robust, scalable systems for real-world deployment

### Key Performance Gains:
- **Batching**: 5-10x throughput improvement
- **Quantization**: 75% model size reduction
- **Streaming**: Handle datasets larger than memory
- **PEFT/LoRA**: Train with 0.1% of parameters

### Next Steps:
1. Experiment with different models and batch sizes
2. Implement quantization on compatible hardware
3. Build custom pipelines for your specific use case
4. Explore synthetic data generation for your domain

**Remember**: Great AI isn't about using the fanciest models. It's about building robust, efficient workflows that solve real problems!