# SyftBox File Observer Tutorial

This tutorial demonstrates how to automatically create SyftMessages when files change in your datasites folder.

## Overview

The SyftBox File Observer:
- Monitors `~/SyftBox_{email}/datasites/` for file changes
- Creates SyftMessages automatically for file events
- Stores messages in `~/SyftBox_{email}/outbox/` ready for transport
- Provides a REST API for monitoring and control

## Setup

First, let's import required libraries and set up our environment:

In [None]:
import syft_serve as ss
import syft_client as sc
from pathlib import Path
import requests
import time
import json

# Configuration
EMAIL = "andrew@openmined.org"  # Change to your email
SYFTBOX_DIR = Path.home() / f"SyftBox_{EMAIL}"
DATASITES_DIR = SYFTBOX_DIR / "datasites"
OUTBOX_DIR = SYFTBOX_DIR / "outbox"

# Create directories
DATASITES_DIR.mkdir(parents=True, exist_ok=True)
OUTBOX_DIR.mkdir(parents=True, exist_ok=True)

print(f"📁 SyftBox: {SYFTBOX_DIR}")
print(f"📂 Datasites: {DATASITES_DIR}")
print(f"📤 Outbox: {OUTBOX_DIR}")

## Simple Observer Example

Let's start with a simple observer that logs file changes:

In [None]:
# Terminate any existing servers
ss.servers.terminate_all()

def simple_observer():
    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler
    import sys
    
    # Force unbuffered output
    sys.stdout.reconfigure(line_buffering=True)
    
    class SimpleHandler(FileSystemEventHandler):
        def on_any_event(self, event):
            if not event.is_directory:
                print(f"📝 {event.event_type}: {Path(event.src_path).name}", flush=True)
    
    observer = Observer()
    observer.schedule(SimpleHandler(), str(DATASITES_DIR), recursive=True)
    observer.start()
    
    return {"status": "Observer started", "watching": str(DATASITES_DIR)}

# Create server
server = ss.create(
    name="simple_observer",
    dependencies=["watchdog"],
    endpoints={"/start": simple_observer}
)

print(f"\n🌐 Server: {server.url}")

# Start the observer
response = requests.get(f"{server.url}/start")
print(f"✅ {response.json()['status']}")

### Test the Simple Observer

In [None]:
# Create a test file
test_file = DATASITES_DIR / "test_data.csv"
test_file.write_text("id,name,value\n1,Alice,100\n2,Bob,200")
print(f"Created: {test_file.name}")

# Wait a moment
time.sleep(1)

# Check the logs
logs = ss.servers['simple_observer'].stdout.lines()[-5:]
print("\nRecent logs:")
for log in logs:
    print(f"  {log}")

## Full SyftMessage Observer

Now let's create the full observer that creates SyftMessages:

In [None]:
# Terminate previous server
ss.servers.terminate_all()

# Global state
state = {
    "observer": None,
    "messages_created": 0,
    "last_event": None
}

def start_syft_observer():
    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler
    import sys
    
    sys.stdout.reconfigure(line_buffering=True)
    
    class SyftMessageHandler(FileSystemEventHandler):
        def on_created(self, event):
            self.handle_event(event, "created")
        
        def on_modified(self, event):
            self.handle_event(event, "modified")
        
        def on_deleted(self, event):
            self.handle_event(event, "deleted")
        
        def handle_event(self, event, event_type):
            if event.is_directory:
                return
                
            file_path = Path(event.src_path)
            
            # Skip hidden and temp files
            if file_path.name.startswith('.') or file_path.suffix == '.tmp':
                return
            
            print(f"\n🔔 {event_type}: {file_path.name}", flush=True)
            
            try:
                # Create SyftMessage
                recipient = "recipient@example.com"  # In practice, determined by datasite
                
                message = sc.SyftMessage.create(
                    sender_email=EMAIL,
                    recipient_email=recipient,
                    message_root=OUTBOX_DIR,
                    message_type="file_update"
                )
                
                # Add event metadata
                message.update_metadata({
                    "event_type": event_type,
                    "datasite_path": str(file_path.relative_to(DATASITES_DIR)),
                    "timestamp": time.time()
                })
                
                # Add file if it exists
                if event_type != "deleted" and file_path.exists():
                    syftbox_path = f"/{EMAIL}/datasites/{file_path.relative_to(DATASITES_DIR)}"
                    message.add_file(
                        source_path=file_path,
                        syftbox_path=syftbox_path,
                        permissions={
                            "read": [recipient],
                            "write": [EMAIL],
                            "admin": [EMAIL]
                        }
                    )
                
                # Add README
                message.add_readme(f"""
                <html><body>
                <h2>File Update</h2>
                <p>Event: {event_type}</p>
                <p>File: {file_path.name}</p>
                <p>Time: {time.strftime('%Y-%m-%d %H:%M:%S')}</p>
                </body></html>
                """)
                
                # Finalize
                message.finalize()
                
                state["messages_created"] += 1
                state["last_event"] = {
                    "type": event_type,
                    "file": file_path.name,
                    "message_id": message.message_id
                }
                
                print(f"✅ Created message: {message.message_id}", flush=True)
                
            except Exception as e:
                print(f"❌ Error: {e}", flush=True)
    
    if state["observer"] is None:
        observer = Observer()
        observer.schedule(SyftMessageHandler(), str(DATASITES_DIR), recursive=True)
        observer.start()
        state["observer"] = observer
        
    return {"status": "SyftMessage observer started"}

def get_status():
    return {
        "running": state["observer"] is not None,
        "messages_created": state["messages_created"],
        "last_event": state["last_event"]
    }

# Create server
server = ss.create(
    name="syft_observer",
    dependencies=["watchdog"],
    endpoints={
        "/start": start_syft_observer,
        "/status": get_status
    }
)

print(f"🌐 Server: {server.url}")

# Start observer
response = requests.get(f"{server.url}/start")
print(f"✅ {response.json()['status']}")

### Test SyftMessage Creation

In [None]:
# Create a new data file
data_file = DATASITES_DIR / "experiment_results.csv"
data_file.write_text("""
experiment_id,accuracy,precision,recall
exp_001,0.92,0.89,0.94
exp_002,0.94,0.91,0.96
exp_003,0.93,0.90,0.95
"".strip())

print(f"📄 Created: {data_file.name}")

# Wait for processing
time.sleep(2)

# Check status
status = requests.get(f"{server.url}/status").json()
print(f"\n📊 Status:")
print(f"  Messages created: {status['messages_created']}")
if status['last_event']:
    print(f"  Last event: {status['last_event']['type']} - {status['last_event']['file']}")
    print(f"  Message ID: {status['last_event']['message_id']}")

In [None]:
# Check the outbox
print("📤 Messages in outbox:\n")

for msg_dir in sorted(OUTBOX_DIR.iterdir()):
    if msg_dir.is_dir() and msg_dir.name.startswith("gdrive_"):
        print(f"📩 {msg_dir.name}")
        
        # Load message
        try:
            msg = sc.SyftMessage(msg_dir)
            metadata = msg.get_metadata()
            
            print(f"   Event: {metadata.get('event_type')}")
            print(f"   File: {metadata.get('datasite_path')}")
            print(f"   Ready: {msg.is_ready}")
            
            # Check files
            files = msg.get_files()
            if files:
                print(f"   Files: {[f['filename'] for f in files]}")
                
        except Exception as e:
            print(f"   Error reading: {e}")
        
        print()

### View Observer Logs

In [None]:
# View recent logs
logs = ss.servers['syft_observer'].stdout.lines()[-20:]

print("📋 Recent observer logs:\n")
for log in logs:
    if log.strip():  # Skip empty lines
        print(log)

## Multiple File Changes

Let's test with multiple file operations:

In [None]:
# Create a subdirectory
project_dir = DATASITES_DIR / "ml_project"
project_dir.mkdir(exist_ok=True)

# Create multiple files
files_created = []

# Model file
model_file = project_dir / "model.json"
model_file.write_text(json.dumps({
    "model_type": "neural_network",
    "layers": [128, 64, 32, 10],
    "activation": "relu"
}, indent=2))
files_created.append(model_file)
print(f"📄 Created: {model_file.relative_to(DATASITES_DIR)}")

time.sleep(0.5)

# Results file
results_file = project_dir / "results.txt"
results_file.write_text("Training accuracy: 95.2%\nValidation accuracy: 93.7%")
files_created.append(results_file)
print(f"📄 Created: {results_file.relative_to(DATASITES_DIR)}")

time.sleep(0.5)

# Config file
config_file = project_dir / "config.yaml"
config_file.write_text("""
training:
  epochs: 100
  batch_size: 32
  learning_rate: 0.001
"".strip())
files_created.append(config_file)
print(f"📄 Created: {config_file.relative_to(DATASITES_DIR)}")

print(f"\n✅ Created {len(files_created)} files")

In [None]:
# Wait and check status
time.sleep(3)

status = requests.get(f"{server.url}/status").json()
print(f"📊 Total messages created: {status['messages_created']}")

# Count messages in outbox
message_count = len([d for d in OUTBOX_DIR.iterdir() 
                    if d.is_dir() and d.name.startswith("gdrive_")])
print(f"📤 Messages in outbox: {message_count}")

## Modifying Files

Let's test file modifications:

In [None]:
# Modify the results file
print("✏️  Modifying results.txt...")
results_file.write_text("""
Training accuracy: 96.8%
Validation accuracy: 94.9%
Test accuracy: 94.2%
"".strip())

time.sleep(2)

# Check logs for the modification event
logs = ss.servers['syft_observer'].stdout.lines()[-10:]
print("\n📋 Recent logs:")
for log in logs:
    if "modified" in log.lower() or "results.txt" in log:
        print(log)

## Examining a SyftMessage

Let's examine one of the created messages in detail:

In [None]:
# Get the most recent message
message_dirs = sorted([d for d in OUTBOX_DIR.iterdir() 
                      if d.is_dir() and d.name.startswith("gdrive_")])

if message_dirs:
    latest_msg_dir = message_dirs[-1]
    print(f"📩 Examining: {latest_msg_dir.name}\n")
    
    # Load the message
    msg = sc.SyftMessage(latest_msg_dir)
    
    # Get metadata
    metadata = msg.get_metadata()
    print("📋 Metadata:")
    print(f"  Message ID: {metadata.get('message_id')}")
    print(f"  Sender: {metadata.get('sender_email')}")
    print(f"  Recipient: {metadata.get('recipient_email')}")
    print(f"  Event Type: {metadata.get('event_type')}")
    print(f"  Datasite Path: {metadata.get('datasite_path')}")
    
    # Check files
    files = msg.get_files()
    print(f"\n📁 Files ({len(files)}):")
    for f in files:
        print(f"  - {f['filename']}")
        print(f"    Size: {f['file_size']} bytes")
        print(f"    Hash: {f['file_hash'][:16]}...")
        print(f"    Permissions: {f['permissions']['read']}")
    
    # Check if ready
    print(f"\n✅ Ready to send: {msg.is_ready}")
    
    # Check README
    readme_path = latest_msg_dir / "README.html"
    if readme_path.exists():
        print(f"\n📄 Has README: Yes ({readme_path.stat().st_size} bytes)")
else:
    print("No messages found in outbox")

## Cleanup

Clean up test files and messages:

In [None]:
# Stop the observer
if state["observer"]:
    state["observer"].stop()
    state["observer"].join()
    print("🛑 Observer stopped")

# Terminate the server
ss.servers.terminate_all()
print("🛑 Server terminated")

# Optional: Clean up test files
print("\n🧹 Clean up test files? (y/n): ", end="")
if input().lower() == 'y':
    import shutil
    
    # Remove test files
    for f in DATASITES_DIR.rglob("*"):
        if f.is_file():
            f.unlink()
            print(f"  Deleted: {f.name}")
    
    # Remove test directories
    if project_dir.exists():
        shutil.rmtree(project_dir)
        print(f"  Deleted: {project_dir.name}/")
    
    # Remove messages
    count = 0
    for msg_dir in OUTBOX_DIR.iterdir():
        if msg_dir.is_dir() and msg_dir.name.startswith("gdrive_"):
            shutil.rmtree(msg_dir)
            count += 1
    print(f"  Deleted {count} messages from outbox")
    
    print("\n✅ Cleanup complete")

## Summary

In this tutorial, we've learned how to:

1. **Create a file observer** using watchdog and syft-serve
2. **Monitor datasites folder** for file changes
3. **Automatically create SyftMessages** for file events
4. **Store messages in outbox** ready for transport
5. **Access observer logs** through syft-serve

### Key Concepts:

- **Event Types**: created, modified, deleted
- **SyftMessage Structure**: metadata, files, permissions, README
- **Transport Agnostic**: Messages work with any transport (GDrive, email, etc.)
- **Automatic Workflow**: No manual intervention needed

### Next Steps:

1. Integrate with transport mechanisms (Google Drive sync)
2. Add recipient determination logic based on datasites
3. Implement message batching for efficiency
4. Add filtering rules for specific file types
5. Create notification system for important changes