# Real-World Use Cases

**Duration:** 40 minutes  
**Level:** Advanced

Complete, practical examples you can adapt for your projects.

## What You'll Learn

- Automated backup system
- Multi-environment deployment
- Report generation pipeline
- File upload processing
- Log aggregation
- Static site deployment
- Data migration

These are production-ready patterns! 💼

In [None]:
from genro_storage import StorageManager
import tempfile
import os
from datetime import datetime, timedelta
import json

storage = StorageManager()
temp_dir = tempfile.mkdtemp()

storage.configure([
    {'name': 'local', 'type': 'local', 'path': temp_dir},
    {'name': 'mem', 'type': 'memory'}
])

print("✓ Storage ready")

## 1. Automated Backup System

Complete backup solution with rotation:

In [None]:
class BackupManager:
    """Automated backup with rotation and verification"""
    
    def __init__(self, storage, source_path, backup_base, max_backups=7):
        self.storage = storage
        self.source = storage.node(source_path)
        self.backup_base = storage.node(backup_base)
        self.max_backups = max_backups
    
    def create_backup(self):
        """Create timestamped backup"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_dir = self.backup_base.child(timestamp)
        backup_dir.mkdir(parents=True, exist_ok=True)
        
        print(f"Creating backup: {timestamp}")
        
        # Incremental backup (skip unchanged)
        stats = {'copied': 0, 'skipped': 0, 'bytes': 0}
        
        def on_copy(src, dst):
            stats['copied'] += 1
            stats['bytes'] += src.size
        
        def on_skip(src, dst):
            stats['skipped'] += 1
        
        self.source.copy(
            backup_dir,
            skip='hash',
            on_file=on_copy,
            on_skip=on_skip
        )
        
        # Create manifest
        manifest = {
            'timestamp': timestamp,
            'source': self.source.fullpath,
            'files_copied': stats['copied'],
            'files_skipped': stats['skipped'],
            'bytes_copied': stats['bytes']
        }
        
        manifest_file = backup_dir.child('backup_manifest.json')
        manifest_file.write_text(json.dumps(manifest, indent=2))
        
        print(f"  ✓ Copied: {stats['copied']} files ({stats['bytes']} bytes)")
        print(f"  ⊘ Skipped: {stats['skipped']} files")
        
        return backup_dir
    
    def rotate_backups(self):
        """Remove old backups"""
        backups = sorted([c for c in self.backup_base.children() if c.isdir])
        
        if len(backups) > self.max_backups:
            to_remove = backups[:-self.max_backups]
            print(f"\nRemoving {len(to_remove)} old backups")
            for backup in to_remove:
                print(f"  Deleting: {backup.basename}")
                backup.delete()
    
    def list_backups(self):
        """List all backups with info"""
        backups = sorted([c for c in self.backup_base.children() if c.isdir])
        
        print(f"\nAvailable backups ({len(backups)}):")
        for backup in backups:
            manifest_file = backup.child('backup_manifest.json')
            if manifest_file.exists:
                manifest = json.loads(manifest_file.read_text())
                print(f"  {backup.basename}: {manifest['files_copied']} files")
            else:
                print(f"  {backup.basename}")

# Example usage
print("Example: Backup system")

# Create test data
data_dir = storage.node('local:important_data')
data_dir.mkdir()
for i in range(5):
    data_dir.child(f'file_{i}.txt').write_text(f'Important data {i}')

# Setup backup manager
bm = BackupManager(
    storage,
    'local:important_data',
    'local:backups',
    max_backups=3
)

# Create first backup
bm.create_backup()

# Modify data
data_dir.child('file_2.txt').write_text('Modified data')

# Create second backup
import time
time.sleep(1)  # Ensure different timestamp
bm.create_backup()

# List backups
bm.list_backups()

# Rotate (if needed)
bm.rotate_backups()

## 2. Multi-Environment Deployment

Deploy configs to dev/staging/prod:

In [None]:
class DeploymentManager:
    """Deploy configurations to multiple environments"""
    
    def __init__(self, storage):
        self.storage = storage
        self.environments = {
            'dev': 'mem:deploy/dev',
            'staging': 'mem:deploy/staging',
            'prod': 'mem:deploy/prod'
        }
    
    def deploy_config(self, config_node, env, dry_run=False):
        """Deploy configuration to environment"""
        if env not in self.environments:
            raise ValueError(f"Unknown environment: {env}")
        
        dest_path = self.environments[env]
        dest = self.storage.node(dest_path)
        dest.parent.mkdir(parents=True, exist_ok=True)
        
        print(f"\nDeploying to {env}:")
        
        if dry_run:
            print(f"  [DRY RUN] Would copy {config_node.fullpath}")
            print(f"            to {dest.fullpath}")
            return
        
        # Backup current if exists
        if dest.exists:
            backup = self.storage.node(f"{dest_path}.backup")
            dest.copy(backup)
            print(f"  ✓ Backed up current config")
        
        # Deploy
        config_node.copy(dest)
        print(f"  ✓ Deployed: {config_node.size} bytes")
        
        # Verify
        if config_node.md5hash == dest.md5hash:
            print(f"  ✓ Verified: checksums match")
        else:
            raise RuntimeError("Deployment verification failed!")
    
    def promote(self, from_env, to_env, dry_run=False):
        """Promote config from one env to another"""
        source_path = self.environments[from_env]
        source = self.storage.node(source_path)
        
        if not source.exists:
            raise FileNotFoundError(f"No config in {from_env}")
        
        print(f"\nPromoting: {from_env} -> {to_env}")
        self.deploy_config(source, to_env, dry_run=dry_run)
    
    def rollback(self, env):
        """Rollback to backup"""
        dest_path = self.environments[env]
        dest = self.storage.node(dest_path)
        backup = self.storage.node(f"{dest_path}.backup")
        
        if not backup.exists:
            raise FileNotFoundError("No backup available")
        
        print(f"\nRolling back {env}:")
        backup.copy(dest)
        print(f"  ✓ Restored from backup")

# Example usage
dm = DeploymentManager(storage)

# Create config
config = storage.node('mem:configs/app_config.json')
config.parent.mkdir(parents=True, exist_ok=True)
config.write_text(json.dumps({
    'version': '1.0',
    'timeout': 30
}, indent=2))

# Deploy to dev (test first)
dm.deploy_config(config, 'dev', dry_run=True)
dm.deploy_config(config, 'dev')

# Test, then promote to staging
dm.promote('dev', 'staging')

# Test, then promote to prod
dm.promote('staging', 'prod')

print("\n✓ Deployment pipeline complete")

## 3. Report Generation Pipeline

Generate reports from multiple data sources:

In [None]:
class ReportGenerator:
    """Generate reports from data sources"""
    
    def __init__(self, storage):
        self.storage = storage
    
    def generate_daily_report(self, date=None):
        """Generate daily business report"""
        if date is None:
            date = datetime.now()
        
        date_str = date.strftime('%Y-%m-%d')
        print(f"Generating report for {date_str}...")
        
        # Build report using iternode
        builder = self.storage.iternode()
        
        # Header
        header = self.storage.node('mem:_temp_header')
        header.write_text(f"""
=====================================
DAILY BUSINESS REPORT
Date: {date_str}
=====================================

""")
        builder.append(header)
        
        # Sales section
        sales_data = self._get_sales_data(date)
        sales_section = self.storage.node('mem:_temp_sales')
        sales_section.write_text(f"""
SALES SUMMARY
-------------
Total Sales: ${sales_data['total']:,.2f}
Orders: {sales_data['orders']}
Average Order: ${sales_data['average']:,.2f}

""")
        builder.append(sales_section)
        
        # Users section
        user_data = self._get_user_data(date)
        user_section = self.storage.node('mem:_temp_users')
        user_section.write_text(f"""
USER ACTIVITY
-------------
New Users: {user_data['new_users']}
Active Users: {user_data['active']}
Total Users: {user_data['total']}

""")
        builder.append(user_section)
        
        # Footer
        footer = self.storage.node('mem:_temp_footer')
        footer.write_text(f"""
=====================================
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
=====================================
""")
        builder.append(footer)
        
        # Save report
        report_file = self.storage.node(f'mem:reports/daily_{date_str}.txt')
        report_file.parent.mkdir(parents=True, exist_ok=True)
        builder.copy(report_file)
        
        print(f"  ✓ Report saved: {report_file.fullpath}")
        print(f"  ✓ Size: {report_file.size} bytes")
        
        return report_file
    
    def _get_sales_data(self, date):
        """Simulate getting sales data"""
        return {
            'total': 15234.50,
            'orders': 87,
            'average': 175.11
        }
    
    def _get_user_data(self, date):
        """Simulate getting user data"""
        return {
            'new_users': 12,
            'active': 543,
            'total': 8921
        }

# Generate report
rg = ReportGenerator(storage)
report = rg.generate_daily_report()

print("\nReport content:")
print(report.read_text())

## 4. File Upload Processing

Process uploaded files (validation, storage, thumbnails):

In [None]:
class UploadProcessor:
    """Process file uploads with validation"""
    
    def __init__(self, storage, upload_dir, max_size_mb=10):
        self.storage = storage
        self.upload_dir = upload_dir
        self.max_size = max_size_mb * 1024 * 1024
        
        self.allowed_types = {
            'image/jpeg', 'image/png', 'image/gif',
            'application/pdf',
            'text/plain'
        }
    
    def process_upload(self, file_node, user_id):
        """Process an uploaded file"""
        print(f"\nProcessing upload: {file_node.basename}")
        
        # Validate size
        if file_node.size > self.max_size:
            raise ValueError(f"File too large: {file_node.size} bytes")
        print(f"  ✓ Size ok: {file_node.size} bytes")
        
        # Validate type
        if file_node.mimetype not in self.allowed_types:
            raise ValueError(f"Type not allowed: {file_node.mimetype}")
        print(f"  ✓ Type ok: {file_node.mimetype}")
        
        # Generate safe filename
        import hashlib
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        hash_part = file_node.md5hash[:8]
        safe_name = f"{timestamp}_{hash_part}_{file_node.basename}"
        
        # Store in user directory
        dest_path = f"{self.upload_dir}/{user_id}/{safe_name}"
        dest = self.storage.node(dest_path)
        dest.parent.mkdir(parents=True, exist_ok=True)
        
        file_node.copy(dest)
        print(f"  ✓ Stored: {dest.fullpath}")
        
        # Create metadata
        metadata = {
            'original_name': file_node.basename,
            'size': file_node.size,
            'mime_type': file_node.mimetype,
            'md5': file_node.md5hash,
            'uploaded_at': datetime.now().isoformat(),
            'user_id': user_id
        }
        
        meta_file = self.storage.node(f"{dest_path}.meta.json")
        meta_file.write_text(json.dumps(metadata, indent=2))
        print(f"  ✓ Metadata saved")
        
        return dest
    
    def list_user_uploads(self, user_id):
        """List all uploads for a user"""
        user_dir = self.storage.node(f"{self.upload_dir}/{user_id}")
        
        if not user_dir.exists:
            return []
        
        uploads = []
        for child in user_dir.children():
            if child.suffix == '.json':
                continue
            
            meta_file = self.storage.node(f"{child.fullpath}.meta.json")
            if meta_file.exists:
                metadata = json.loads(meta_file.read_text())
                uploads.append({
                    'file': child,
                    'metadata': metadata
                })
        
        return uploads

# Example usage
processor = UploadProcessor(storage, 'mem:uploads', max_size_mb=5)

# Simulate uploaded files
upload1 = storage.node('mem:temp/photo.jpg')
upload1.parent.mkdir(parents=True, exist_ok=True)
upload1.write_bytes(b'\xFF\xD8\xFF' + b'fake jpeg data' * 100)

upload2 = storage.node('mem:temp/document.pdf')
upload2.write_text('PDF content here...')

# Process uploads
processor.process_upload(upload1, user_id='user123')
processor.process_upload(upload2, user_id='user123')

# List user uploads
uploads = processor.list_user_uploads('user123')
print(f"\nUser has {len(uploads)} uploads:")
for upload in uploads:
    meta = upload['metadata']
    print(f"  - {meta['original_name']} ({meta['size']} bytes)")

## 5. Log Aggregation

Collect and merge logs from multiple sources:

In [None]:
class LogAggregator:
    """Aggregate logs from multiple servers"""
    
    def __init__(self, storage):
        self.storage = storage
    
    def aggregate_logs(self, server_ids, date=None):
        """Collect logs from all servers"""
        if date is None:
            date = datetime.now()
        
        date_str = date.strftime('%Y-%m-%d')
        print(f"Aggregating logs for {date_str}...")
        
        # Collect all log nodes
        log_nodes = []
        for server_id in server_ids:
            log_path = f'mem:servers/{server_id}/logs/{date_str}.log'
            log_node = self.storage.node(log_path)
            
            if log_node.exists:
                log_nodes.append(log_node)
                print(f"  Found log: {server_id} ({log_node.size} bytes)")
        
        if not log_nodes:
            print("  No logs found")
            return None
        
        # Use iternode for lazy concatenation
        aggregated = self.storage.iternode(*log_nodes)
        
        # Save aggregated logs
        output = self.storage.node(f'mem:aggregated_logs/{date_str}.log')
        output.parent.mkdir(parents=True, exist_ok=True)
        aggregated.copy(output)
        
        print(f"\n✓ Aggregated {len(log_nodes)} log files")
        print(f"✓ Total size: {output.size} bytes")
        
        return output
    
    def archive_old_logs(self, days_old=30):
        """Archive logs older than N days"""
        cutoff = datetime.now() - timedelta(days=days_old)
        print(f"Archiving logs older than {cutoff.date()}...")
        
        logs_dir = self.storage.node('mem:aggregated_logs')
        if not logs_dir.exists:
            return
        
        # Find old logs
        old_logs = []
        for log_file in logs_dir.children():
            if log_file.mtime < cutoff.timestamp():
                old_logs.append(log_file)
        
        if not old_logs:
            print("  No old logs to archive")
            return
        
        # Create archive
        archive_name = f"logs_archive_{datetime.now().strftime('%Y%m%d')}.zip"
        archive_builder = self.storage.iternode(*old_logs)
        
        archive_node = self.storage.node(f'mem:archives/{archive_name}')
        archive_node.parent.mkdir(parents=True, exist_ok=True)
        archive_node.write_bytes(archive_builder.zip())
        
        print(f"  ✓ Archived {len(old_logs)} files to {archive_name}")
        print(f"  ✓ Archive size: {archive_node.size} bytes")
        
        # Delete old logs
        for log_file in old_logs:
            log_file.delete()
        
        print(f"  ✓ Deleted {len(old_logs)} old log files")

# Create test logs
servers = ['web01', 'web02', 'api01']
date_str = datetime.now().strftime('%Y-%m-%d')

for server in servers:
    log = storage.node(f'mem:servers/{server}/logs/{date_str}.log')
    log.parent.mkdir(parents=True, exist_ok=True)
    log.write_text(f"[{server}] Log entries here...\n" * 10)

# Aggregate
aggregator = LogAggregator(storage)
result = aggregator.aggregate_logs(servers)

print("\nAggregated log sample:")
print(result.read_text()[:200] + "...")

## 6. Try It Yourself! 🎯

**Exercise 1:** Build a photo gallery system:

In [None]:
class PhotoGallery:
    """
    Photo gallery with:
    - Upload and validation
    - Thumbnail generation (multiple sizes)
    - Album organization
    - Metadata (EXIF)
    """
    # Your code here
    pass

**Exercise 2:** Create a data migration tool:

In [None]:
class DataMigrator:
    """
    Migrate data from one storage to another:
    - Verify checksums
    - Progress tracking
    - Resume capability
    - Rollback on error
    """
    # Your code here
    pass

**Exercise 3:** Build a document workflow system:

In [None]:
class DocumentWorkflow:
    """
    Document workflow with:
    - Upload drafts
    - Review and approve
    - Version tracking
    - Final publication
    - Archive old versions
    """
    # Your code here
    pass

## 7. Cleanup

In [None]:
import shutil

if os.path.exists(temp_dir):
    shutil.rmtree(temp_dir)

print("✓ Cleanup complete")

## Summary

You've seen production-ready patterns:

- ✓ Automated backup with rotation
- ✓ Multi-environment deployment
- ✓ Report generation pipeline
- ✓ File upload processing
- ✓ Log aggregation and archiving

## Key Patterns

**Backup Systems:**
- Timestamped directories
- Incremental with `skip='hash'`
- Manifest files for verification
- Rotation to limit storage

**Deployment:**
- Dry run before execution
- Backup before deploy
- Checksum verification
- Rollback capability

**Processing:**
- Validation before storage
- Safe filename generation
- Metadata tracking
- Progress callbacks

**Aggregation:**
- iternode for lazy concat
- ZIP for archiving
- Time-based cleanup

## Production Tips

✅ **Do:**
- Always validate inputs
- Use dry run for critical operations
- Track with manifests/metadata
- Implement rollback
- Monitor with callbacks
- Test backup restore

❌ **Don't:**
- Deploy without testing
- Skip verification
- Forget error handling
- Leave temp files
- Ignore edge cases

## Congratulations! 🎉

You've completed all tutorials and are ready to:

- Build production storage systems
- Integrate with external tools
- Deploy to multiple environments
- Process files at scale
- Implement robust backups

## Next Steps

- Review [API Reference](../docs/api_reference.rst)
- Check [Examples](../docs/examples.rst)
- Read [Advanced Features](../docs/advanced.rst)
- Join the community

**Happy building!** 🚀