In [10]:
#| default_exp sync

# Sync Operations
> Remote synchronization engine for chasqui workflow automation

This module handles all communication between local and remote systems:

- Upload jobs from local queue to remote waiting directory
- Trigger remote agent to process waiting jobs
- Parse remote agent logs and update local database
- Download completed results

## Key Design Decisions

**Manual 2FA Requirement:** All remote operations happen in a single SSH session 
to minimize authentication overhead.

**Lightweight Remote State:** Remote side uses append-only log files rather than 
a database for simplicity and robustness.

**Self-Perpetuating Queue:** Completed PBS jobs trigger the agent to submit 
waiting jobs, eliminating need for cron.

## Workflow
```
1. Get QUEUED_LOCAL jobs from database
2. Upload VASP inputs to remote work directories
3. Generate and upload PBS scripts to waiting/
4. Trigger agent to submit jobs
5. Parse agent log to see what was submitted
6. Update local database with submission status
7. Check for completed jobs (flags)
8. Update database and optionally download results
```

In [11]:
#| export
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict, Any
import json
import tempfile
import os

from chasqui.database import ChasquiDB
from chasqui.ssh import SSHConnection
from chasqui.templates import generate_pbs_script_from_job
from chasqui.agent import deploy_agent, trigger_agent, parse_agent_log

from fastcore.basics import patch

## Architecture

The sync operation follows this flow:
```
Local DB → SSH Connection → Remote System
   ↓                            ↓
QUEUED_LOCAL              waiting/*.sh
   ↓                            ↓
UPLOADED                  agent.sh (triggered)
   ↓                            ↓
SUBMITTED ←── agent.log ←── PBS Queue
   ↓
COMPLETED
```

The `sync()` function orchestrates all operations in a single SSH session.

## Sync Configuration

Store sync parameters in a configuration object.

In [12]:
#| export

class SyncConfig:
    """
    Configuration for sync operations.
    
    Example:
        >>> config = SyncConfig(
        ...     remote_host='bebop',
        ...     chasqui_remote_dir='$HOME/chasqui_remote',
        ...     max_queued=40,
        ...     max_running=30
        ... )
    """
    def __init__(
        self,
        remote_host: str = 'bebop',
        chasqui_remote_dir: str = '$HOME/chasqui_remote',
        max_queued: int = 40,
        max_running: int = 30,
        auto_deploy_agent: bool = True,
        download_results: bool = False
    ):
        self.remote_host = remote_host
        self.chasqui_remote_dir = chasqui_remote_dir
        self.max_queued = max_queued
        self.max_running = max_running
        self.auto_deploy_agent = auto_deploy_agent
        self.download_results = download_results

## Helper Functions

Internal functions for sync operations.

In [13]:
#| export

def _upload_vasp_inputs(
    ssh: SSHConnection,
    job: Dict[str, Any],
    work_dir: str
) -> None:
    """
    Upload VASP input files to remote work directory.
    
    Args:
        ssh: Active SSH connection
        job: Job dictionary from database
        work_dir: Remote work directory (expanded path)
    """
    local_path = Path(job['local_path']).expanduser()
    
    if not local_path.exists():
        raise FileNotFoundError(f"Local job directory not found: {local_path}")
    
    # Create remote work directory
    ssh.run(f'mkdir -p {work_dir}')
    
    # Upload VASP input files
    for filename in ['POSCAR', 'INCAR', 'KPOINTS', 'POTCAR']:
        local_file = local_path / filename
        if local_file.exists():
            remote_file = f"{work_dir}/{filename}"
            ssh.upload(str(local_file), remote_file)
        # Note: POTCAR might not exist for some test cases

In [14]:
#| export

def _upload_pbs_script(
    ssh: SSHConnection,
    job: Dict[str, Any],
    work_dir: str,
    waiting_dir: str
) -> str:
    """
    Generate and upload PBS script to waiting directory.
    
    Args:
        ssh: Active SSH connection
        job: Job dictionary from database
        work_dir: Remote work directory (expanded)
        waiting_dir: Remote waiting directory (expanded)
        
    Returns:
        Remote script path
    """
    # Generate PBS script with work_dir
    job_with_workdir = job.copy()
    if job_with_workdir.get('vasp_config'):
        config = json.loads(job_with_workdir['vasp_config'])
        config['remote_work_dir'] = work_dir
        job_with_workdir['vasp_config'] = json.dumps(config)
    else:
        job_with_workdir['vasp_config'] = json.dumps({'remote_work_dir': work_dir})
    
    script = generate_pbs_script_from_job(job_with_workdir)
    
    # Write to temporary local file
    with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as tmp:
        tmp.write(script)
        tmp_path = tmp.name
    
    try:
        # Upload to waiting directory
        remote_script = f"{waiting_dir}/{job['job_id']}.sh"
        ssh.upload(tmp_path, remote_script)
        return remote_script
    finally:
        os.unlink(tmp_path)

In [15]:
#| export

def _check_completed_jobs(
    ssh: SSHConnection,
    completed_dir: str,
    jobs: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
    """
    Check for completion flags and parse them.
    
    Args:
        ssh: Active SSH connection
        completed_dir: Remote completed directory (expanded)
        jobs: List of job dictionaries to check
        
    Returns:
        List of completed job updates: [{'job_id': ..., 'status': 'DONE', 'pbs_id': ...}, ...]
    """
    completed = []
    
    for job in jobs:
        job_id = job['job_id']
        flag_file = f"{completed_dir}/{job_id}.flag"
        
        if ssh.exists(flag_file):
            # Read flag content
            flag_content = ssh.run(f'cat {flag_file}')
            lines = flag_content.strip().split('\n')
            
            if len(lines) >= 1:
                status = lines[0].strip()  # DONE or FAIL
                pbs_id = lines[1].strip() if len(lines) > 1 else None
                
                completed.append({
                    'job_id': job_id,
                    'status': status,
                    'pbs_id': pbs_id
                })
    
    return completed

## Main Sync Function

The orchestrator that performs all sync operations.

In [23]:
#| export

def sync(
    config: Optional[SyncConfig] = None,
    local_db_path: str = "$HOME/.chasqui/jobs.db",
    dry_run: bool = False
) -> Dict[str, Any]:
    """
    Synchronize local and remote job queues.
    
    This is the main orchestration function that:
    1. Uploads queued jobs to remote
    2. Triggers remote agent
    3. Syncs job status back to local DB
    4. Downloads completed results (optional)
    
    Args:
        config: SyncConfig object (if None, uses defaults)
        local_db_path: Path to local SQLite database
        dry_run: If True, show what would happen without executing
        
    Returns:
        Dictionary with sync statistics:
        {
            'uploaded': 5,
            'submitted': 3,
            'completed': 2,
            'failed': 0,
            'timestamp': '2025-10-28T10:30:00Z'
        }
        
    Example:
        >>> from chasqui.sync import sync, SyncConfig
        >>> config = SyncConfig(remote_host='bebop')
        >>> result = sync(config)
        >>> print(f"Uploaded {result['uploaded']} jobs")
    """
    # Use default config if not provided
    if config is None:
        config = SyncConfig()
    
    # Initialize database
    db = ChasquiDB(local_db_path)

    # Ensure database is initialized
    try:
        db.get_jobs_by_state('QUEUED_LOCAL')  # Test if tables exist
    except Exception:
        db.init_db()  # Initialize if tables don't exist
    
    # Stats to return
    stats = {
        'uploaded': 0,
        'submitted': 0,
        'completed': 0,
        'failed': 0,
        'timestamp': datetime.now().isoformat()
    }
    
    if dry_run:
        print("[DRY RUN MODE]")
        queued_jobs = db.get_jobs_by_state('QUEUED_LOCAL')
        print(f"Would upload {len(queued_jobs)} jobs")
        return stats
    
    # Open SSH connection (single session for all operations)
    with SSHConnection(config.remote_host) as ssh:
        
        # Expand remote paths
        chasqui_dir = ssh.run(f'echo {config.chasqui_remote_dir}').strip()
        waiting_dir = f"{chasqui_dir}/waiting"
        submitted_dir = f"{chasqui_dir}/submitted"
        completed_dir = f"{chasqui_dir}/completed"
        
        # Ensure directories exist
        ssh.run(f'mkdir -p {waiting_dir} {submitted_dir} {completed_dir} {chasqui_dir}/logs')
        
        # Deploy agent if needed
        if config.auto_deploy_agent:
            deploy_agent(
                ssh,
                chasqui_remote_dir=config.chasqui_remote_dir,
                max_queued=config.max_queued,
                max_running=config.max_running
            )
        
        # 1. UPLOAD PHASE: Get QUEUED_LOCAL jobs and upload them
        queued_jobs = db.get_jobs_by_state('QUEUED_LOCAL')
        
        for job in queued_jobs:
            job_id = job['job_id']
            
            # Determine work directory
            if job.get('vasp_config'):
                vasp_config = json.loads(job['vasp_config'])
                work_dir = vasp_config.get('remote_work_dir', f"$HOME/scratch/vasp_jobs/{job_id}")
            else:
                work_dir = f"$HOME/scratch/vasp_jobs/{job_id}"
            
            # Expand work_dir
            work_dir = ssh.run(f'echo {work_dir}').strip()
            
            try:
                # Upload VASP inputs
                _upload_vasp_inputs(ssh, job, work_dir)
                
                # Generate and upload PBS script
                _upload_pbs_script(ssh, job, work_dir, waiting_dir)
                
                # Update database
                db.update_state(job_id, 'UPLOADED', remote_path=work_dir)
                stats['uploaded'] += 1
                
            except Exception as e:
                print(f"Error uploading job {job_id}: {e}")
                continue
        
        # 2. TRIGGER AGENT: Submit waiting jobs
        if stats['uploaded'] > 0 or db.get_jobs_by_state('UPLOADED'):
            trigger_agent(ssh, config.chasqui_remote_dir)
        
        # 3. PARSE AGENT LOG: See what was submitted
        try:
            log_content = ssh.run(f'cat {chasqui_dir}/logs/agent.log')
            log_entries = parse_agent_log(log_content)
            
            # Find recent submissions (since last sync)
            last_sync = db.get_last_sync()
            last_sync_time = last_sync['timestamp'] if last_sync else None
            
            for entry in log_entries:
                if entry['action'] == 'AGENT_SUBMIT' and entry.get('status') == 'success':
                    job_id = entry.get('job')
                    pbs_id = entry.get('pbs_id')
                    
                    # Check if this job is in our database
                    job = db.get_job(job_id)
                    if job and job['state'] in ['UPLOADED', 'QUEUED_LOCAL']:
                        db.update_state(job_id, 'SUBMITTED', pbs_id=pbs_id)
                        stats['submitted'] += 1
        except Exception as e:
            print(f"Warning: Could not parse agent log: {e}")
        
        # 4. CHECK COMPLETED JOBS: Look for completion flags
        submitted_jobs = db.get_jobs_by_state('SUBMITTED')
        running_jobs = db.get_jobs_by_state('RUNNING')
        
        completed_updates = _check_completed_jobs(
            ssh,
            completed_dir,
            submitted_jobs + running_jobs
        )
        
        for update in completed_updates:
            db.update_state(
                update['job_id'],
                update['status'],  # DONE or FAIL
                pbs_id=update.get('pbs_id')
            )
            
            if update['status'] == 'DONE':
                stats['completed'] += 1
            else:
                stats['failed'] += 1
        
        # 5. DOWNLOAD RESULTS (optional)
        if config.download_results:
            # TODO: Implement result download
            pass
    
    # Log sync operation
    db.log_sync(
        uploaded=stats['uploaded'],
        submitted=stats['submitted'],
        completed=stats['completed'],
        failed=stats['failed']
    )
    
    return stats

## Convenience Functions

Simplified interfaces for common operations.

In [24]:
#| export

def quick_sync(remote_host: str = 'bebop') -> Dict[str, Any]:
    """
    Quick sync with default settings.
    
    Args:
        remote_host: SSH host to connect to
        
    Returns:
        Sync statistics
        
    Example:
        >>> from chasqui.sync import quick_sync
        >>> result = quick_sync('bebop')
        >>> print(result)
    """
    config = SyncConfig(remote_host=remote_host)
    return sync(config)

## Tests

Basic functionality tests.

In [25]:
#| hide
import tempfile
import os

# Test SyncConfig
config = SyncConfig(
    remote_host='test_host',
    max_queued=50,
    max_running=25
)

assert config.remote_host == 'test_host'
assert config.max_queued == 50
assert config.max_running == 25
print("✓ SyncConfig works")

# Test dry run with temporary database
with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp:
    test_db_path = tmp.name

try:
    # Initialize database
    db = ChasquiDB(test_db_path)
    db.init_db()
    
    # Test dry run
    result = sync(config, local_db_path=test_db_path, dry_run=True)
    assert 'uploaded' in result
    assert 'timestamp' in result
    assert result['uploaded'] == 0  # No jobs in test DB
    print("✓ Dry run works")
    
finally:
    # Cleanup
    os.unlink(test_db_path)

print("\n✅ All sync module tests passed!")

✓ SyncConfig works
[DRY RUN MODE]
Would upload 0 jobs
✓ Dry run works

✅ All sync module tests passed!


## Usage Examples

### Basic Sync
```python
from chasqui.sync import sync, SyncConfig

# Configure
config = SyncConfig(
    remote_host='bebop',
    max_queued=40,
    max_running=30
)

# Sync
result = sync(config)

print(f"Uploaded: {result['uploaded']}")
print(f"Submitted: {result['submitted']}")
print(f"Completed: {result['completed']}")
print(f"Failed: {result['failed']}")
```

### Quick Sync (with defaults)
```python
from chasqui.sync import quick_sync

result = quick_sync('bebop')
print(result)
```

### Complete Workflow Example
```python
from chasqui.database import ChasquiDB
from chasqui.sync import sync, SyncConfig

# 1. Create jobs locally
db = ChasquiDB()
db.init_db()

job_id = db.create_job(
    local_path="/path/to/vasp_job",
    vasp_config={
        "job_name": "Au_relax",
        "cores": 2,
        "walltime": "24:00:00",
        "project": "AARC1"
    }
)

# 2. Queue for submission
db.update_state(job_id, 'QUEUED_LOCAL')

# 3. Sync (uploads, submits, checks status)
config = SyncConfig(remote_host='bebop')
result = sync(config)

print(f"Job {job_id} uploaded and submitted!")
```