# Recursive Pipeline Playground

This notebook explores recursive pipeline patterns with objects that have `open()` and `build()` methods. We'll implement a simple recursive pipeline and demonstrate it with an encrypted dictionary example.

## Key Concepts:
- **Recursive Pipeline**: A pipeline where each step can itself be a pipeline
- **Open/Build Pattern**: Objects that can be "opened" for modification and then "built" into final form
- **Encrypted Dict**: A practical example showing secure data transformation

## 1. Define the Pipeline Object

Let's start with a simple recursive pipeline class that can chain operations together.

In [1]:
class Pipeline:
    """
    Simple recursive pipeline where each step can be a function or another Pipeline
    """
    def __init__(self, step=None):
        self.step = step
        self.next = None
    
    def then(self, step_or_pipeline):
        """Add a step (function) or another pipeline to the chain"""
        if self.next is None:
            if callable(step_or_pipeline):
                self.next = Pipeline(step_or_pipeline)
            else:
                self.next = step_or_pipeline
        else:
            self.next.then(step_or_pipeline)
        return self  # Always return root pipeline
    
    def process(self, obj):
        """Execute the pipeline on an object"""
        if self.step:
            obj = self.step(obj)
        if self.next:
            obj = self.next.process(obj)
        return obj
    
    def __repr__(self):
        steps = []
        current = self
        while current:
            if current.step:
                steps.append(getattr(current.step, '__name__', str(current.step)))
            current = current.next
        return f"Pipeline({' -> '.join(steps)})"

# Test with simple functions
def add_one(x):
    print(f"add_one: {x} -> {x + 1}")
    return x + 1

def multiply_by_two(x):
    print(f"multiply_by_two: {x} -> {x * 2}")
    return x * 2

def subtract_three(x):
    print(f"subtract_three: {x} -> {x - 3}")
    return x - 3

# Create a simple pipeline
simple_pipeline = Pipeline(add_one).then(multiply_by_two).then(subtract_three)
print("Simple pipeline:", simple_pipeline)
print("Result:", simple_pipeline.process(5))

Simple pipeline: Pipeline(add_one -> multiply_by_two -> subtract_three)
add_one: 5 -> 6
multiply_by_two: 6 -> 12
subtract_three: 12 -> 9
Result: 9


## 2. Implement open and build Methods

Now let's create a pipeline that supports the open/build pattern - where you can "open" an object for modification and "build" it into its final form.

In [3]:
class BuildablePipeline:
    """
    Simple pipeline with open/build pattern - same linking structure as Pipeline
    """
    def __init__(self, step=None, opener=None, builder=None):
        self.step = step
        self.opener = opener    # Function to "open" the object
        self.builder = builder  # Function to "build" the final object
        self.next = None
    
    def then(self, step_or_pipeline):
        """Add a step (function) or another pipeline to the chain"""
        if self.next is None:
            if callable(step_or_pipeline):
                self.next = BuildablePipeline(step_or_pipeline)
            else:
                self.next = step_or_pipeline
        else:
            self.next.then(step_or_pipeline)
        return self  # Always return root pipeline
    
    def process(self, obj):
        """Process an object through the pipeline"""
        # 1. Open the object (only at the root)
        if self.opener:
            obj = self.opener(obj)
        
        # 2. Apply this step
        if self.step:
            obj = self.step(obj)
        
        # 3. Continue to next pipeline (recursive)
        if self.next:
            obj = self.next.process(obj)
        
        # 4. Build final object (only at the root)
        if self.builder:
            obj = self.builder(obj)
        
        return obj

# Example transformations for testing
def uppercase_values(data):
    """Transform all string values to uppercase"""
    result = {}
    for k, v in data.items():
        if isinstance(v, str):
            result[k] = v.upper()
        else:
            result[k] = v
    print(f"uppercase_values: {data} -> {result}")
    return result

def add_timestamp(data):
    """Add a timestamp to the data"""
    from datetime import datetime
    result = data.copy()
    result['timestamp'] = datetime.now().isoformat()
    print(f"add_timestamp: added timestamp")
    return result

# Test the buildable pipeline
def simple_opener(obj):
    print(f"Opening: {obj}")
    return obj.copy() if hasattr(obj, 'copy') else obj

def simple_builder(obj):
    print(f"Building final result: {obj}")
    return obj

# Create a simple buildable pipeline - same linking as section 1
buildable_pipeline = BuildablePipeline(add_timestamp, simple_opener, simple_builder) \
    .then(uppercase_values)

# Test data
test_data = {"name": "john", "email": "john@example.com", "status": "active"}
print("\nTesting BuildablePipeline:")
result = buildable_pipeline.process(test_data)
print("Final result:", result)


Testing BuildablePipeline:
Opening: {'name': 'john', 'email': 'john@example.com', 'status': 'active'}
add_timestamp: added timestamp
uppercase_values: {'name': 'john', 'email': 'john@example.com', 'status': 'active', 'timestamp': '2025-07-30T01:41:04.673103'} -> {'name': 'JOHN', 'email': 'JOHN@EXAMPLE.COM', 'status': 'ACTIVE', 'timestamp': '2025-07-30T01:41:04.673103'}
Building final result: {'name': 'JOHN', 'email': 'JOHN@EXAMPLE.COM', 'status': 'ACTIVE', 'timestamp': '2025-07-30T01:41:04.673103'}
Final result: {'name': 'JOHN', 'email': 'JOHN@EXAMPLE.COM', 'status': 'ACTIVE', 'timestamp': '2025-07-30T01:41:04.673103'}


## 5. Complex File Operations Pipeline

Let's create a sophisticated example that demonstrates file operations with automatic resource management, data processing, and error handling through the open/build pattern.

In [4]:
import os
import tempfile
import json
import csv
from contextlib import contextmanager
from typing import Dict, List, Any, Optional
import shutil

class FileProcessor:
    """
    Complex file processor that uses open/build pattern for safe file operations
    Supports multiple file formats and automatic cleanup
    """
    def __init__(self, file_path: str, backup_enabled: bool = True):
        self.file_path = file_path
        self.backup_enabled = backup_enabled
        self.temp_dir = None
        self.working_file = None
        self.backup_file = None
        self.is_open = False
        self.file_format = self._detect_format()
        self.metadata = {
            'operations': [],
            'created_temp_files': [],
            'warnings': []
        }
    
    def _detect_format(self) -> str:
        """Detect file format from extension"""
        ext = os.path.splitext(self.file_path)[1].lower()
        format_map = {
            '.json': 'json',
            '.csv': 'csv',
            '.txt': 'text',
            '.log': 'text'
        }
        return format_map.get(ext, 'text')
    
    def open(self) -> Dict[str, Any]:
        """
        Open file for processing - creates working copy and backup
        Returns a working data structure
        """
        if self.is_open:
            raise RuntimeError("FileProcessor is already open")
        
        print(f"📂 Opening file: {self.file_path}")
        
        # Create temporary directory for operations
        self.temp_dir = tempfile.mkdtemp(prefix="fileprocessor_")
        self.metadata['temp_dir'] = self.temp_dir
        
        # Create backup if enabled and file exists
        if self.backup_enabled and os.path.exists(self.file_path):
            self.backup_file = os.path.join(self.temp_dir, f"backup_{os.path.basename(self.file_path)}")
            shutil.copy2(self.file_path, self.backup_file)
            print(f"💾 Created backup: {self.backup_file}")
            self.metadata['operations'].append('backup_created')
        
        # Create working file
        self.working_file = os.path.join(self.temp_dir, f"working_{os.path.basename(self.file_path)}")
        
        # Load data based on format
        working_data = {'content': None, 'metadata': self.metadata}
        
        if os.path.exists(self.file_path):
            working_data['content'] = self._load_file_content()
        else:
            working_data['content'] = self._create_empty_content()
            self.metadata['warnings'].append('file_not_found_created_empty')
        
        self.is_open = True
        self.metadata['operations'].append('file_opened')
        
        print(f"✅ File opened successfully. Format: {self.file_format}")
        return working_data
    
    def _load_file_content(self) -> Any:
        """Load file content based on detected format"""
        try:
            with open(self.file_path, 'r', encoding='utf-8') as f:
                if self.file_format == 'json':
                    return json.load(f)
                elif self.file_format == 'csv':
                    reader = csv.DictReader(f)
                    return list(reader)
                else:  # text
                    return f.read().splitlines()
        except Exception as e:
            self.metadata['warnings'].append(f'load_error: {str(e)}')
            return self._create_empty_content()
    
    def _create_empty_content(self) -> Any:
        """Create empty content based on format"""
        if self.file_format == 'json':
            return {}
        elif self.file_format == 'csv':
            return []
        else:  # text
            return []
    
    def build(self, working_data: Dict[str, Any]) -> 'FileProcessor':
        """
        Build final file from working data and cleanup
        """
        if not self.is_open:
            raise RuntimeError("FileProcessor is not open")
        
        print(f"🔨 Building final file: {self.file_path}")
        
        # Save working data to final file
        self._save_file_content(working_data['content'])
        
        # Update metadata
        self.metadata = working_data['metadata']
        self.metadata['operations'].append('file_built')
        
        # Cleanup
        self._cleanup()
        
        print(f"✅ File built successfully")
        
        # Return new processor instance pointing to the updated file
        new_processor = FileProcessor(self.file_path, self.backup_enabled)
        new_processor.metadata = self.metadata.copy()
        return new_processor
    
    def _save_file_content(self, content: Any):
        """Save content to file based on format"""
        os.makedirs(os.path.dirname(self.file_path), exist_ok=True)
        
        with open(self.file_path, 'w', encoding='utf-8') as f:
            if self.file_format == 'json':
                json.dump(content, f, indent=2, ensure_ascii=False)
            elif self.file_format == 'csv':
                if content and len(content) > 0:
                    fieldnames = content[0].keys() if isinstance(content[0], dict) else ['value']
                    writer = csv.DictWriter(f, fieldnames=fieldnames)
                    writer.writeheader()
                    writer.writerows(content)
            else:  # text
                if isinstance(content, list):
                    f.write('\\n'.join(content))
                else:
                    f.write(str(content))
    
    def _cleanup(self):
        """Clean up temporary files"""
        if self.temp_dir and os.path.exists(self.temp_dir):
            shutil.rmtree(self.temp_dir)
            print(f"🧹 Cleaned up temp directory: {self.temp_dir}")
        self.is_open = False
    
    def peek_content(self) -> Any:
        """Peek at file content without opening for modification"""
        if os.path.exists(self.file_path):
            return self._load_file_content()
        return self._create_empty_content()
    
    def __repr__(self):
        status = "open" if self.is_open else "closed"
        return f"FileProcessor('{self.file_path}', format={self.file_format}, status={status})"

# Create test files for demonstration
os.makedirs('test_files', exist_ok=True)

# Create sample JSON file
sample_json = {
    "users": [
        {"id": 1, "name": "John Doe", "email": "john@example.com", "status": "active"},
        {"id": 2, "name": "Jane Smith", "email": "jane@example.com", "status": "pending"}
    ],
    "metadata": {"version": "1.0", "created": "2025-07-30"}
}

with open('test_files/users.json', 'w') as f:
    json.dump(sample_json, f, indent=2)

# Create sample CSV file
sample_csv_data = [
    {"product": "laptop", "price": "1200", "category": "electronics"},
    {"product": "book", "price": "25", "category": "education"},
    {"product": "phone", "price": "800", "category": "electronics"}
]

with open('test_files/products.csv', 'w', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=['product', 'price', 'category'])
    writer.writeheader()
    writer.writerows(sample_csv_data)

print("📁 Created test files:")
print("- test_files/users.json")
print("- test_files/products.csv")

# Test basic file processor
processor = FileProcessor('test_files/users.json')
print("\\nFile processor created:", processor)
print("Peek at content:", processor.peek_content())

📁 Created test files:
- test_files/users.json
- test_files/products.csv
\nFile processor created: FileProcessor('test_files/users.json', format=json, status=closed)
Peek at content: {'users': [{'id': 1, 'name': 'John Doe', 'email': 'john@example.com', 'status': 'active'}, {'id': 2, 'name': 'Jane Smith', 'email': 'jane@example.com', 'status': 'pending'}], 'metadata': {'version': '1.0', 'created': '2025-07-30'}}


In [6]:
# File Processing Pipeline that works with FileProcessor
class FileProcessingPipeline(BuildablePipeline):
    """
    Specialized pipeline for FileProcessor with automatic resource management
    """
    def __init__(self):
        super().__init__(
            opener=lambda file_processor: file_processor.open(),
            builder=lambda working_data: self.source_processor.build(working_data)
        )
        self.source_processor = None
    
    def add_transform(self, transform_func):
        """Add a transformation function to the pipeline"""
        return self.then(transform_func)
    
    def process(self, file_processor: FileProcessor):
        """Process a FileProcessor through the pipeline"""
        self.source_processor = file_processor
        return super().process(file_processor)

# Complex file transformation functions
def validate_json_structure(working_data):
    """Validate JSON file has required structure"""
    content = working_data['content']
    metadata = working_data['metadata']
    
    if not isinstance(content, dict):
        raise ValueError("JSON content must be a dictionary")
    
    required_keys = ['users', 'metadata']
    for key in required_keys:
        if key not in content:
            content[key] = [] if key == 'users' else {}
            metadata['warnings'].append(f'added_missing_key: {key}')
    
    metadata['operations'].append('json_structure_validated')
    print("✅ JSON structure validated")
    return working_data

def normalize_user_emails(working_data):
    """Normalize email addresses in user data"""
    content = working_data['content']
    metadata = working_data['metadata']
    
    if 'users' in content:
        for user in content['users']:
            if 'email' in user:
                old_email = user['email']
                user['email'] = user['email'].lower().strip()
                if old_email != user['email']:
                    print(f"📧 Normalized email: {old_email} -> {user['email']}")
    
    metadata['operations'].append('emails_normalized')
    return working_data

def add_user_metadata(working_data):
    """Add processing metadata to users"""
    content = working_data['content']
    metadata = working_data['metadata']
    
    if 'users' in content:
        for user in content['users']:
            user['processed_at'] = '2025-07-30T12:00:00Z'
            user['processor_version'] = '2.0'
    
    metadata['operations'].append('user_metadata_added')
    print("📝 Added user metadata")
    return working_data

def update_file_metadata(working_data):
    """Update file-level metadata"""
    content = working_data['content']
    metadata = working_data['metadata']
    
    if isinstance(content, dict) and 'metadata' in content:
        content['metadata']['last_processed'] = '2025-07-30T12:00:00Z'
        content['metadata']['processor'] = 'FileProcessingPipeline'
        content['metadata']['operations_count'] = len(metadata['operations'])
    
    metadata['operations'].append('file_metadata_updated')
    print("📊 Updated file metadata")
    return working_data

def backup_original_data(working_data):
    """Create a backup of original data in the working structure"""
    content = working_data['content']
    metadata = working_data['metadata']
    
    # Store original data reference
    if 'original_backup' not in metadata:
        metadata['original_backup'] = json.loads(json.dumps(content))  # Deep copy
        metadata['operations'].append('original_data_backed_up')
        print("💾 Created original data backup")
    
    return working_data

# CSV-specific transformations
def normalize_csv_prices(working_data):
    """Normalize price data in CSV"""
    content = working_data['content']
    metadata = working_data['metadata']
    
    if isinstance(content, list):
        for row in content:
            if 'price' in row:
                try:
                    # Convert to float and back to normalized string
                    price_val = float(row['price'])
                    row['price'] = f"{price_val:.2f}"
                    row['price_normalized'] = True
                except ValueError:
                    metadata['warnings'].append(f'invalid_price: {row.get("price", "unknown")}')
    
    metadata['operations'].append('csv_prices_normalized')
    print("💰 Normalized CSV prices")
    return working_data

def add_csv_categories(working_data):
    """Add category analysis to CSV data"""
    content = working_data['content']
    metadata = working_data['metadata']
    
    if isinstance(content, list):
        categories = {}
        for row in content:
            category = row.get('category', 'unknown')
            categories[category] = categories.get(category, 0) + 1
        
        # Add category summary to metadata
        metadata['category_summary'] = categories
        metadata['operations'].append('csv_categories_analyzed')
        print(f"📊 Analyzed categories: {categories}")
    
    return working_data

# Create pipelines for different file types
json_processing_pipeline = FileProcessingPipeline() \
    .add_transform(backup_original_data) \
    .add_transform(validate_json_structure) \
    .add_transform(normalize_user_emails) \
    .add_transform(add_user_metadata) \
    .add_transform(update_file_metadata)

csv_processing_pipeline = FileProcessingPipeline() \
    .add_transform(backup_original_data) \
    .add_transform(normalize_csv_prices) \
    .add_transform(add_csv_categories)

print("\\n=== Testing JSON File Processing Pipeline ===")
json_processor = FileProcessor('test_files/users.json')
print("Before processing:", json_processor.peek_content())

processed_json = json_processing_pipeline.process(json_processor)
print("\\nAfter processing:", processed_json.peek_content())
print("Processing metadata:", processed_json.metadata)

\n=== Testing JSON File Processing Pipeline ===
Before processing: {'users': [{'id': 1, 'name': 'John Doe', 'email': 'john@example.com', 'status': 'active'}, {'id': 2, 'name': 'Jane Smith', 'email': 'jane@example.com', 'status': 'pending'}], 'metadata': {'version': '1.0', 'created': '2025-07-30'}}
📂 Opening file: test_files/users.json
💾 Created backup: C:\Users\flash\AppData\Local\Temp\fileprocessor_7xe5shsn\backup_users.json
✅ File opened successfully. Format: json
💾 Created original data backup
✅ JSON structure validated
📝 Added user metadata
📊 Updated file metadata
🔨 Building final file: test_files/users.json
🧹 Cleaned up temp directory: C:\Users\flash\AppData\Local\Temp\fileprocessor_7xe5shsn
✅ File built successfully
\nAfter processing: {'users': [{'id': 1, 'name': 'John Doe', 'email': 'john@example.com', 'status': 'active', 'processed_at': '2025-07-30T12:00:00Z', 'processor_version': '2.0'}, {'id': 2, 'name': 'Jane Smith', 'email': 'jane@example.com', 'status': 'pending', 'proces