In [None]:
# 🚀 Syft Code Queue Tutorial

This notebook demonstrates how to use **syft-code-queue** - a simple, lightweight system for executing code on remote SyftBox datasites.

## What is Syft Code Queue?

- **Simple**: Submit code folders with `run.sh` scripts
- **Secure**: Auto-approval rules and safe execution
- **Lightweight**: Much simpler than RDS
- **AI-Ready**: Perfect for AI-generated code execution

## Architecture

```
Client App → Submit Code → Remote Queue → Auto-Approve → Execute → Results
```


In [None]:
## 📦 Installation

```bash
pip install syft-code-queue
```


In [None]:
# Import the library
import syft_code_queue as scq
from pathlib import Path
import tempfile
import time
import json

print(f"Syft Code Queue version: {scq.__version__}")


In [None]:
## 🏗️ Creating Your First Code Package

Every code submission must be a folder containing a `run.sh` script:


In [None]:
# Create a simple analysis package
def create_analysis_package():
    # Create temporary directory
    package_dir = Path(tempfile.mkdtemp())
    
    # Create Python analysis script
    analysis_script = package_dir / "analysis.py"
    analysis_script.write_text('''
import os
import json
from datetime import datetime

def main():
    print(f"🔍 Starting analysis...")
    print(f"Job ID: {os.environ.get('SYFT_JOB_ID', 'unknown')}")
    print(f"Requester: {os.environ.get('SYFT_REQUESTER', 'unknown')}")
    
    # Simulate some analysis
    results = {
        "timestamp": datetime.now().isoformat(),
        "analysis_type": "sample_analysis",
        "records_processed": 1000,
        "insights": [
            "Data quality is good",
            "No missing values detected",
            "Trend analysis complete"
        ]
    }
    
    # Save results
    output_dir = os.environ.get('SYFT_OUTPUT_DIR', '.')
    with open(f"{output_dir}/results.json", 'w') as f:
        json.dump(results, f, indent=2)
    
    print("✅ Analysis complete!")
    print(f"Results saved to: {output_dir}/results.json")

if __name__ == "__main__":
    main()
    ''')
    
    # Create run.sh script
    run_script = package_dir / "run.sh"
    run_script.write_text('''
#!/bin/bash
set -e

echo "🚀 Starting job execution..."
echo "Job: $SYFT_JOB_NAME"
echo "Requester: $SYFT_REQUESTER"
echo "Output Directory: $SYFT_OUTPUT_DIR"

# Run the Python analysis
python analysis.py

echo "🎉 Job completed successfully!"
    ''')
    
    # Make executable
    run_script.chmod(0o755)
    
    return package_dir

# Create our first package
my_package = create_analysis_package()
print(f"📦 Created package at: {my_package}")
print(f"📁 Contents: {list(my_package.iterdir())}")


In [None]:
## 🚀 Submitting Your First Job

Now let's submit this code package for execution on a remote datasite:


In [None]:
# Submit the job
target_email = "data-owner@example.com"  # Replace with actual datasite email

job = scq.submit_code(
    target_email=target_email,
    code_folder=my_package,
    name="Sample Data Analysis",
    description="A simple analysis to demonstrate syft-code-queue",
    tags=["demo", "analysis", "tutorial"],
    auto_approval=True  # Allow auto-approval if rules permit
)

print(f"✅ Job submitted successfully!")
print(f"📋 Job ID: {job.uid}")
print(f"📧 Target: {job.target_email}")
print(f"🏷️  Status: {job.status.value}")
print(f"📅 Created: {job.created_at}")


In [None]:
## 📊 Monitoring Job Status

Let's check on our job and see how to monitor its progress:


In [None]:
# Create a client to interact with jobs
client = scq.create_client()

# Check specific job status
current_job = client.get_job(job.uid)
if current_job:
    print(f"📋 Job: {current_job.name}")
    print(f"🏷️  Status: {current_job.status.value}")
    print(f"⏰ Updated: {current_job.updated_at}")
    
    if current_job.started_at:
        print(f"🚀 Started: {current_job.started_at}")
    
    if current_job.completed_at:
        print(f"✅ Completed: {current_job.completed_at}")
        print(f"⏱️  Duration: {current_job.duration:.2f}s")
    
    if current_job.error_message:
        print(f"❌ Error: {current_job.error_message}")
else:
    print("❌ Job not found")


In [None]:
## 📋 Listing Jobs

You can list and filter jobs in various ways:


In [None]:
# List all recent jobs
print("📋 Recent Jobs:")
print("=" * 50)

recent_jobs = client.list_jobs(limit=10)
for job in recent_jobs:
    status_emoji = {
        "pending": "⏳",
        "approved": "✅", 
        "running": "🏃",
        "completed": "🎉",
        "failed": "❌",
        "rejected": "🚫"
    }.get(job.status.value, "❓")
    
    print(f"{status_emoji} {job.name} - {job.status.value}")
    print(f"   📧 Target: {job.target_email}")
    print(f"   📅 Created: {job.created_at}")
    print()


In [None]:
# Filter jobs by status
pending_jobs = client.list_jobs(status=scq.JobStatus.pending)
print(f"⏳ Pending jobs: {len(pending_jobs)}")

completed_jobs = client.list_jobs(status=scq.JobStatus.completed)
print(f"🎉 Completed jobs: {len(completed_jobs)}")

# Filter by target email
target_jobs = client.list_jobs(target_email=target_email)
print(f"📧 Jobs for {target_email}: {len(target_jobs)}")


In [None]:
## 📊 Retrieving Results

Once a job completes, you can retrieve its outputs and logs:


In [None]:
# Check if job completed and get results
def check_job_results(job_uid):
    job = client.get_job(job_uid)
    if not job:
        print("❌ Job not found")
        return
    
    print(f"📋 Job: {job.name}")
    print(f"🏷️  Status: {job.status.value}")
    
    if job.status == scq.JobStatus.completed:
        print("\n📊 Results:")
        
        # Get output directory
        output_path = client.get_job_output(job_uid)
        if output_path and output_path.exists():
            print(f"📁 Output directory: {output_path}")
            
            # List output files
            output_files = list(output_path.iterdir())
            print(f"📄 Output files: {[f.name for f in output_files]}")
            
            # Show results.json if it exists
            results_file = output_path / "results.json"
            if results_file.exists():
                results = json.loads(results_file.read_text())
                print("\n📈 Analysis Results:")
                for key, value in results.items():
                    print(f"   {key}: {value}")
        
        # Get execution logs
        logs = client.get_job_logs(job_uid)
        if logs:
            print("\n📋 Execution Logs:")
            print("-" * 40)
            print(logs)
    
    elif job.status == scq.JobStatus.failed:
        print(f"❌ Job failed: {job.error_message}")
    
    elif job.status == scq.JobStatus.rejected:
        print(f"🚫 Job rejected: {job.error_message}")
    
    else:
        print(f"⏳ Job still {job.status.value}...")

# Check our job results
check_job_results(job.uid)


In [None]:
## 🖥️ Running a Queue Server

Now let's see how to run a queue server that processes jobs:


In [None]:
# Define custom auto-approval rules
def custom_approval_rules(job):
    """Custom logic for auto-approving jobs."""
    print(f"🔍 Reviewing job: {job.name}")
    
    # Auto-approve demo/tutorial jobs
    if "demo" in job.tags or "tutorial" in job.tags:
        print("✅ Auto-approved: Demo/tutorial job")
        return True
    
    # Auto-approve analysis jobs with safe tags
    safe_tags = {"analysis", "visualization", "statistics", "report"}
    if any(tag in safe_tags for tag in job.tags):
        print("✅ Auto-approved: Safe analysis job")
        return True
    
    # Auto-approve from trusted requesters
    trusted_domains = ["@company.com", "@university.edu"]
    if any(domain in job.requester_email for domain in trusted_domains):
        print("✅ Auto-approved: Trusted requester")
        return True
    
    print("⚠️  Requires manual approval")
    return False

# Create server with custom rules
server = scq.create_server(
    auto_approval_callback=custom_approval_rules,
    max_concurrent_jobs=2,
    job_timeout=300,  # 5 minutes
    auto_approval_enabled=True
)

print("🖥️  Queue server created with custom approval rules")
print(f"📧 Server email: {server.email}")


In [None]:
# Start the server (runs in background)
print("🚀 Starting queue server...")
server.start()

# Show pending jobs
pending = server.list_pending_jobs()
print(f"⏳ Pending jobs: {len(pending)}")

# Let it run for a moment
time.sleep(5)

print("\n🛑 Stopping server...")
server.stop()
print("✅ Server stopped")


In [None]:
## 🛡️ Security Features

Syft Code Queue includes several security features:


In [None]:
# Using SafeCodeRunner with security restrictions
from syft_code_queue import SafeCodeRunner

# Create a safe runner with restrictions
safe_runner = SafeCodeRunner(
    timeout=300,  # 5 minute timeout
    max_output_size=10*1024*1024,  # 10MB max output
    blocked_commands=["rm", "sudo", "chmod", "passwd"],  # Blocked commands
    allowed_commands=["python", "pip", "echo", "cat"]  # Optional whitelist
)

print("🛡️  SafeCodeRunner configured with security restrictions")
print(f"⏰ Timeout: {safe_runner.timeout}s")
print(f"📊 Max output: {safe_runner.max_output_size / 1024 / 1024}MB")
print(f"🚫 Blocked commands: {safe_runner.blocked_commands}")


In [None]:
## 🔧 Configuration Options

You can customize the queue behavior with various configuration options:


In [None]:
# Custom configuration
from syft_code_queue import QueueConfig

config = QueueConfig(
    queue_name="my-analysis-queue",
    max_concurrent_jobs=3,
    job_timeout=600,  # 10 minutes
    cleanup_completed_after=7200,  # 2 hours
    auto_approval_enabled=True
)

print("⚙️ Custom Configuration:")
print(f"📛 Queue name: {config.queue_name}")
print(f"🔄 Max concurrent: {config.max_concurrent_jobs}")
print(f"⏰ Job timeout: {config.job_timeout}s")
print(f"🧹 Cleanup after: {config.cleanup_completed_after}s")
print(f"✅ Auto-approval: {config.auto_approval_enabled}")

# Use custom config
custom_client = scq.CodeQueueClient(config=config)
print(f"\n📧 Custom client email: {custom_client.email}")


In [None]:
## 🤖 AI Integration Example

Here's how you might integrate with AI systems to generate and execute code:


In [None]:
def create_ai_package(user_prompt):
    """Create a code package from AI-generated code (mock example)."""
    
    # Mock AI code generation (replace with syft-nsai)
    if "visualization" in user_prompt.lower():
        ai_code = '''
import matplotlib.pyplot as plt
import numpy as np
import os

# Generate sample data
x = np.linspace(0, 10, 100)
y = np.sin(x)

# Create plot
plt.figure(figsize=(10, 6))
plt.plot(x, y, 'b-', linewidth=2)
plt.title('AI-Generated Visualization')
plt.xlabel('X values')
plt.ylabel('Y values')
plt.grid(True)

# Save plot
output_dir = os.environ.get('SYFT_OUTPUT_DIR', '.')
plt.savefig(f'{output_dir}/ai_visualization.png')
print("📊 Visualization saved!")
        '''
        requirements = ["matplotlib", "numpy"]
    else:
        ai_code = 'print("Hello from AI-generated code!")'
        requirements = []
    
    # Create package
    package_dir = Path(tempfile.mkdtemp())
    (package_dir / "ai_code.py").write_text(ai_code)
    
    if requirements:
        (package_dir / "requirements.txt").write_text("\n".join(requirements))
    
    # Create run.sh
    run_script = package_dir / "run.sh"
    run_script.write_text('''#!/bin/bash
set -e

echo "🤖 Running AI-generated code..."

# Install requirements if needed
if [ -f requirements.txt ]; then
    pip install -r requirements.txt
fi

# Run AI code
python ai_code.py

echo "✅ AI code execution complete!"
    ''')
    run_script.chmod(0o755)
    
    return package_dir

# Example AI workflow
user_prompt = "Create a visualization of a sine wave"
print(f"👤 User: {user_prompt}")

ai_package = create_ai_package(user_prompt)
print(f"🤖 AI generated code package: {ai_package}")

# Submit AI-generated code
ai_job = scq.submit_code(
    target_email=target_email,
    code_folder=ai_package,
    name="AI-Generated Visualization",
    description=f"Code generated from prompt: {user_prompt}",
    tags=["ai-generated", "visualization", "automated"],
    auto_approval=True
)

print(f"🚀 AI job submitted: {ai_job.uid}")


In [None]:
## 📋 Job Lifecycle Summary

Understanding the complete job lifecycle:

```
📤 Submit → ⏳ pending → ✅ approved → 🏃 running → 🎉 completed
                      ↘ 🚫 rejected            ↘ ❌ failed
```

### Status Reference:
- **⏳ pending**: Waiting for approval
- **✅ approved**: Approved, waiting to run
- **🏃 running**: Currently executing  
- **🎉 completed**: Finished successfully
- **❌ failed**: Execution failed
- **🚫 rejected**: Rejected by data owner

## 🎯 Best Practices

1. **Code Structure**: Always include `run.sh` as the entry point
2. **Environment Variables**: Use `SYFT_OUTPUT_DIR` for results
3. **Error Handling**: Use `set -e` in bash scripts
4. **Dependencies**: Include `requirements.txt` when needed
5. **Security**: Use appropriate tags for auto-approval
6. **Testing**: Test code locally before submission

## 🔗 Integration Points

- **syft-nsai**: Generate code with AI, execute with queue
- **SyftBox**: Leverages existing datasite infrastructure  
- **Custom Apps**: Easy integration with any Python application

## 📚 Next Steps

- Explore the `examples/` directory for more samples
- Read the API documentation
- Set up custom approval rules for your use case
- Integrate with your existing data science workflows

---

**Happy coding with Syft Code Queue! 🚀**
