# Pipeline Integration Tutorial for Developers

This tutorial covers advanced usage of the embryo metadata annotation system for pipeline developers and bioinformaticians.

## Architecture Overview

- **EmbryoMetadata**: Main class for persistent annotation storage
- **AnnotationBatch**: Temporary workspace for batch operations
- **Pipeline Script**: `07_embryo_metadata_update.py` for automated processing
- **BaseFileHandler**: Atomic file operations with backup/recovery

## Pipeline Script Integration

The `07_embryo_metadata_update.py` script integrates with your existing pipeline:

In [None]:
# Example: Using the pipeline script
import subprocess

# Initialize metadata from SAM2
result = subprocess.run([
    'python', 'scripts/pipelines/07_embryo_metadata_update.py',
    'path/to/sam2_annotations.json',
    '--output-dir', 'data/'
], capture_output=True, text=True)

print("Script output:")
print(result.stdout)

if result.returncode == 0:
    print("‚úÖ Metadata initialization successful")
else:
    print(f"‚ùå Script failed: {result.stderr}")

## Advanced API Usage

### Direct Snip Manipulation

For precise control, use the snip ID approach:

In [None]:
import sys
sys.path.append('scripts')

from annotations.embryo_metadata import EmbryoMetadata
from annotations.annotation_batch import AnnotationBatch

# Load metadata
metadata = EmbryoMetadata('path/to/your_sam2_file.json')

# Direct snip ID manipulation for precise control
specific_snips = [
    "20240418_A01_e01_s0100",
    "20240418_A01_e01_s0150", 
    "20240418_A01_e02_s0200"
]

result = metadata.add_phenotype(
    phenotype="BLUR",
    author="pipeline_script",
    snip_ids=specific_snips
)

print(f"Applied BLUR to {result['count']} specific snips")
print(f"Approach: {result['approach']}")

### Batch Processing Workflow

Use AnnotationBatch for safe, isolated operations:

In [None]:
# Create a batch workspace
batch = metadata.initialize_batch(
    mode="skeleton",  # Empty annotations, preserve structure
    author="automated_pipeline"
)

print(f"Created batch with {len(batch.data['embryos'])} embryos")
print(f"Batch author: {batch.author}")

In [None]:
# Add annotations to batch (isolated from main data)
embryo_ids = list(batch.data["embryos"].keys())[:5]  # First 5 embryos

for i, embryo_id in enumerate(embryo_ids):
    # Batch operations inherit author automatically
    batch.add_phenotype("NORMAL", embryo_id=embryo_id, target="all")
    
    # Add genotypes based on experimental design
    if i % 2 == 0:
        batch.add_genotype("tmem67", embryo_id=embryo_id, zygosity="homozygous")
    else:
        batch.add_genotype("WT", embryo_id=embryo_id, zygosity="homozygous")

print(f"Added annotations to {len(embryo_ids)} embryos in batch")

In [None]:
# Preview batch contents
print(batch.preview(limit=3))

In [None]:
# Apply batch with conflict resolution
report = metadata.apply_batch(
    batch,
    on_conflict="merge",  # Intelligently combine annotations
    dry_run=True  # Test first
)

print(f"Dry run results:")
print(f"Would apply: {report['applied_count']} annotations")
print(f"Would skip: {report['skipped_count']} conflicts")
print(f"Errors: {len(report['errors'])}")

if len(report['errors']) == 0:
    # Actually apply if no errors
    final_report = metadata.apply_batch(batch, on_conflict="merge")
    print(f"‚úÖ Applied {final_report['applied_count']} annotations")
else:
    print(f"‚ùå Errors found: {report['errors']}")

## Processing Multiple SAM2 Files

Batch processing workflow for large datasets:

In [None]:
import glob
from pathlib import Path
import time

# Process multiple SAM2 files
sam2_files = glob.glob("data/sam2_annotations/*.json")
output_dir = Path("data/embryo_metadata")

def process_sam2_file(sam2_path, output_dir):
    """Process a single SAM2 file."""
    try:
        start_time = time.time()
        
        # Initialize metadata
        metadata = EmbryoMetadata(str(sam2_path))
        
        # Generate output path
        output_path = output_dir / f"{Path(sam2_path).stem}_biology.json"
        
        # Save with custom path
        metadata.annotations_path = output_path
        metadata.file_handler.file_path = output_path
        metadata.save()
        
        processing_time = time.time() - start_time
        stats = metadata.get_stats()
        
        return {
            "file": sam2_path,
            "embryos": stats["embryo_count"],
            "snips": stats["total_snips"],
            "time": processing_time,
            "output": output_path,
            "status": "success"
        }
    except Exception as e:
        return {
            "file": sam2_path,
            "error": str(e),
            "status": "failed"
        }

# Process files
results = []
for sam2_file in sam2_files[:3]:  # Process first 3 files
    result = process_sam2_file(sam2_file, output_dir)
    results.append(result)
    
    if result["status"] == "success":
        print(f"‚úÖ {Path(result['file']).name}: {result['embryos']} embryos, {result['time']:.2f}s")
    else:
        print(f"‚ùå {Path(result['file']).name}: {result['error']}")

# Summary
successful = [r for r in results if r["status"] == "success"]
total_embryos = sum(r["embryos"] for r in successful)
total_time = sum(r["time"] for r in successful)

print(f"\nSummary: {len(successful)}/{len(results)} files processed")
print(f"Total embryos: {total_embryos}")
print(f"Total time: {total_time:.2f}s")
print(f"Average: {total_time/len(successful):.2f}s per file")

## Error Recovery and Validation

Robust error handling for production pipelines:

In [None]:
def robust_annotation_pipeline(sam2_path, annotation_plan):
    """
    Robust annotation pipeline with error recovery.
    
    Args:
        sam2_path: Path to SAM2 file
        annotation_plan: Dict with annotation instructions
    """
    try:
        # Initialize with validation
        metadata = EmbryoMetadata(sam2_path)
        
        # Create batch for safety
        batch = metadata.initialize_batch(mode="skeleton", author="pipeline")
        
        errors = []
        success_count = 0
        
        # Process annotation plan
        for operation in annotation_plan:
            try:
                if operation["type"] == "phenotype":
                    result = batch.add_phenotype(
                        phenotype=operation["phenotype"],
                        embryo_id=operation["embryo_id"],
                        target=operation.get("target", "all"),
                        overwrite_dead=operation.get("overwrite_dead", False)
                    )
                    success_count += result["count"]
                    
                elif operation["type"] == "genotype":
                    batch.add_genotype(
                        gene=operation["gene"],
                        embryo_id=operation["embryo_id"],
                        zygosity=operation.get("zygosity", "unknown"),
                        overwrite=operation.get("overwrite", False)
                    )
                    success_count += 1
                    
            except ValueError as e:
                errors.append({
                    "operation": operation,
                    "error": str(e)
                })
                continue
        
        # Apply batch if no critical errors
        if len(errors) < len(annotation_plan) * 0.5:  # Less than 50% errors
            apply_report = metadata.apply_batch(batch, on_conflict="merge")
            metadata.save()
            
            return {
                "status": "success",
                "applied": apply_report["applied_count"],
                "errors": errors,
                "error_rate": len(errors) / len(annotation_plan)
            }
        else:
            return {
                "status": "failed",
                "reason": "Too many errors",
                "errors": errors,
                "error_rate": len(errors) / len(annotation_plan)
            }
            
    except Exception as e:
        return {
            "status": "critical_failure",
            "error": str(e)
        }

# Example annotation plan
annotation_plan = [
    {
        "type": "phenotype",
        "phenotype": "NORMAL",
        "embryo_id": "20240418_A01_e01",
        "target": "all"
    },
    {
        "type": "genotype",
        "gene": "tmem67",
        "embryo_id": "20240418_A01_e01",
        "zygosity": "homozygous"
    },
    {
        "type": "phenotype",
        "phenotype": "DEAD",
        "embryo_id": "20240418_A01_e01",
        "target": "200"
    }
]

# Run pipeline
result = robust_annotation_pipeline("path/to/sam2.json", annotation_plan)
print(f"Pipeline result: {result}")

## Performance Optimization

Tips for optimal performance with large datasets:

In [None]:
import time

def benchmark_annotation_methods(metadata, embryo_ids):
    """Compare performance of different annotation approaches."""
    
    # Method 1: Individual operations
    start_time = time.time()
    for embryo_id in embryo_ids[:10]:
        metadata.add_phenotype("NORMAL", "benchmark", embryo_id, "all")
    individual_time = time.time() - start_time
    
    # Method 2: Batch operations
    start_time = time.time()
    batch = metadata.initialize_batch(mode="skeleton", author="benchmark")
    for embryo_id in embryo_ids[10:20]:
        batch.add_phenotype("NORMAL", embryo_id=embryo_id, target="all")
    metadata.apply_batch(batch)
    batch_time = time.time() - start_time
    
    # Method 3: Direct snip operations (when precise control needed)
    start_time = time.time()
    # Collect all snip IDs for efficient batch operation
    all_snips = []
    for embryo_id in embryo_ids[20:30]:
        embryo_snips = list(metadata.data["embryos"][embryo_id]["snips"].keys())
        all_snips.extend(embryo_snips[:5])  # First 5 snips per embryo
    
    metadata.add_phenotype("NORMAL", "benchmark", snip_ids=all_snips)
    direct_time = time.time() - start_time
    
    print(f"Performance comparison (10 embryos):")
    print(f"Individual operations: {individual_time:.3f}s")
    print(f"Batch operations: {batch_time:.3f}s")
    print(f"Direct snip operations: {direct_time:.3f}s")
    print(f"\nRecommendation: Use batch operations for safety and reasonable performance")

# Run benchmark if you have metadata loaded
# benchmark_annotation_methods(metadata, metadata.list_embryos())

## Integration with Analysis Pipelines

Export annotations for downstream analysis:

In [None]:
import pandas as pd

def export_annotations_for_analysis(metadata, output_path):
    """Export annotations in analysis-ready format."""
    
    records = []
    
    for embryo_id, embryo_data in metadata.data["embryos"].items():
        # Extract embryo-level info
        genotype_info = embryo_data.get("genotype", {})
        
        # Extract frame-level info
        for snip_id, snip_data in embryo_data.get("snips", {}).items():
            frame_number = snip_data.get("frame_number")
            
            # Create record for each phenotype
            phenotypes = snip_data.get("phenotypes", [])
            if not phenotypes:
                # Add record even if no phenotypes
                records.append({
                    "embryo_id": embryo_id,
                    "experiment_id": embryo_data.get("experiment_id"),
                    "video_id": embryo_data.get("video_id"),
                    "snip_id": snip_id,
                    "frame_number": frame_number,
                    "phenotype": None,
                    "phenotype_author": None,
                    "gene": genotype_info.get("gene"),
                    "zygosity": genotype_info.get("zygosity"),
                    "genotype_author": genotype_info.get("author")
                })
            else:
                for phenotype in phenotypes:
                    records.append({
                        "embryo_id": embryo_id,
                        "experiment_id": embryo_data.get("experiment_id"),
                        "video_id": embryo_data.get("video_id"),
                        "snip_id": snip_id,
                        "frame_number": frame_number,
                        "phenotype": phenotype["value"],
                        "phenotype_author": phenotype["author"],
                        "gene": genotype_info.get("gene"),
                        "zygosity": genotype_info.get("zygosity"),
                        "genotype_author": genotype_info.get("author")
                    })
    
    # Create DataFrame and save
    df = pd.DataFrame(records)
    df.to_csv(output_path, index=False)
    
    print(f"Exported {len(records)} annotation records to {output_path}")
    print(f"Columns: {list(df.columns)}")
    
    return df

# Example export
# df = export_annotations_for_analysis(metadata, "analysis_data.csv")
# print(df.head())

## Advanced Conflict Resolution

Handle complex merge scenarios:

In [None]:
def advanced_batch_merge(metadata, batch1, batch2):
    """Merge multiple batches with custom conflict resolution."""
    
    # Apply first batch
    report1 = metadata.apply_batch(batch1, on_conflict="merge", dry_run=True)
    print(f"Batch 1 dry run: {report1['applied_count']} would be applied")
    
    if len(report1['errors']) == 0:
        metadata.apply_batch(batch1, on_conflict="merge")
        print("‚úÖ Batch 1 applied successfully")
    
    # Apply second batch with different strategy
    report2 = metadata.apply_batch(batch2, on_conflict="skip", dry_run=True)
    print(f"Batch 2 dry run: {report2['applied_count']} would be applied, {report2['skipped_count']} skipped")
    
    if report2['applied_count'] > 0:
        metadata.apply_batch(batch2, on_conflict="skip")
        print("‚úÖ Batch 2 applied with conflict skipping")
    
    return {
        "batch1_applied": report1['applied_count'],
        "batch2_applied": report2['applied_count'],
        "batch2_skipped": report2['skipped_count']
    }

# Example usage with simulated batches
# result = advanced_batch_merge(metadata, batch1, batch2)
# print(f"Merge result: {result}")

## Monitoring and Logging

Production-ready logging and monitoring:

In [None]:
import logging
from datetime import datetime

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('annotation_pipeline.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

def monitored_annotation_pipeline(sam2_files, output_dir):
    """Annotation pipeline with comprehensive monitoring."""
    
    logger.info(f"Starting annotation pipeline for {len(sam2_files)} files")
    
    stats = {
        "files_processed": 0,
        "files_failed": 0,
        "total_embryos": 0,
        "total_annotations": 0,
        "start_time": datetime.now()
    }
    
    for sam2_file in sam2_files:
        try:
            logger.info(f"Processing {sam2_file}")
            
            # Process file
            metadata = EmbryoMetadata(sam2_file)
            file_stats = metadata.get_stats()
            
            # Update statistics
            stats["files_processed"] += 1
            stats["total_embryos"] += file_stats["embryo_count"]
            stats["total_annotations"] += file_stats["total_phenotypes"]
            
            # Save with monitoring
            output_path = output_dir / f"{Path(sam2_file).stem}_biology.json"
            metadata.annotations_path = output_path
            metadata.file_handler.file_path = output_path
            metadata.save()
            
            logger.info(f"Completed {sam2_file}: {file_stats['embryo_count']} embryos")
            
        except Exception as e:
            stats["files_failed"] += 1
            logger.error(f"Failed to process {sam2_file}: {str(e)}")
    
    # Final statistics
    stats["end_time"] = datetime.now()
    stats["duration"] = (stats["end_time"] - stats["start_time"]).total_seconds()
    
    logger.info(f"Pipeline completed:")
    logger.info(f"  Files processed: {stats['files_processed']}/{len(sam2_files)}")
    logger.info(f"  Total embryos: {stats['total_embryos']}")
    logger.info(f"  Duration: {stats['duration']:.1f}s")
    logger.info(f"  Rate: {stats['files_processed']/stats['duration']:.2f} files/s")
    
    return stats

# Example usage
# sam2_files = glob.glob("data/sam2_annotations/*.json")
# output_dir = Path("data/embryo_metadata")
# stats = monitored_annotation_pipeline(sam2_files, output_dir)

## Best Practices Summary

### For Pipeline Integration:

1. **Use batch operations** for safety and isolation
2. **Implement dry-run validation** before applying changes
3. **Handle errors gracefully** with appropriate recovery
4. **Use appropriate conflict resolution** strategies
5. **Monitor performance** and log operations
6. **Export data** in analysis-ready formats

### Performance Guidelines:

- **Batch operations**: ~100-1000 embryos per batch for optimal performance
- **File operations**: Use atomic saves to prevent corruption
- **Memory usage**: Monitor with large datasets (>10k embryos)
- **Parallelization**: Process independent SAM2 files in parallel

### Error Handling:

- **Validate inputs** before batch operations
- **Use dry-run mode** for complex operations
- **Implement retries** for transient failures
- **Log all operations** for debugging
- **Monitor error rates** and alert on anomalies

This system is designed to be robust, performant, and integrate seamlessly with your existing bioinformatics pipelines. üöÄ