Skip to content

[Phase 4.5] Implement heartbeat and zombie detection #44

@zhexuany

Description

@zhexuany

Summary

Implement worker heartbeat for liveness tracking and zombie detection to reclaim jobs from dead workers.

Parent Epic

Dependencies

Design

Heartbeat

  • Each worker writes to /heartbeat/{pod_id} periodically
  • Contains timestamp and current job info
  • Combined with checkpoint for efficiency

Zombie Detection

  • Any worker can scan for stale heartbeats
  • Reclaim jobs from dead workers
  • Preserve checkpoint for resume

Tasks

4.5.1 Define Heartbeat Thread

  1. Create src/distributed/heartbeat.rs
  2. Define HeartbeatManager:
    • pod_id: String
    • tikv_client: TikvClient
    • current_job: Arc<Mutex<Option>>
  3. Define configuration:
    • heartbeat_interval (default: 30s)
    • stale_threshold (default: 5 minutes)

4.5.2 Implement Heartbeat Update

  1. update_heartbeat():
    • Write to /heartbeat/{pod_id}
    • Include: pod_id, hostname, current_job, last_seen, started_at
  2. start_background_thread():
    • Spawn thread
    • Loop: update, sleep(interval)
    • Stop on shutdown signal

4.5.3 Integrate with Checkpoint

  1. When processing a job:
    • Heartbeat included in checkpoint transaction
    • Single TiKV round trip for both
  2. When idle:
    • Standalone heartbeat update
  3. Reduce TiKV load

4.5.4 Define Zombie Reaper

  1. Create src/distributed/reaper.rs
  2. Define ZombieReaper:
    • tikv_client: TikvClient
    • stale_threshold: Duration
  3. Runs periodically (every 60s)
  4. Not leader-elected (all workers run it)

4.5.5 Implement Zombie Detection

  1. find_stale_workers() -> Vec<String>:
    • Scan /heartbeat/ prefix
    • Filter where last_seen < now - threshold
    • Or heartbeat key missing
  2. find_orphaned_jobs() -> Vec<JobRecord>:
    • Query jobs with status=Processing
    • Check if owner's heartbeat is stale

4.5.6 Implement Job Reclamation

  1. reclaim_job(job_id: &str) -> Result<bool>:
    • Transaction:
      • Read job (verify still Processing)
      • Read owner heartbeat (verify stale)
      • Write: status=Pending, owner=null
    • Commit
    • Return true if reclaimed
  2. Preserve checkpoint (crucial!)
  3. Log reclamation event

4.5.7 Implement Reaper Loop

  1. run():
    loop {
        orphaned = find_orphaned_jobs()
        for job in orphaned:
            if reclaim_job(job):
                log("Reclaimed job {job} from dead worker")
        sleep(60s)
    }
    
  2. Limit reclaims per iteration (prevent thundering herd)

4.5.8 Cleanup on Shutdown

  1. On graceful shutdown:
    • Stop heartbeat thread
    • Delete heartbeat key
    • Release any held jobs (return to Pending)
  2. On crash: Heartbeat goes stale naturally

4.5.9 Add Metrics

  • heartbeat_updates_total
  • heartbeat_age_seconds (gauge)
  • reaper_jobs_reclaimed_total
  • reaper_stale_workers_found_total

Acceptance Criteria

  • Heartbeat thread updates periodically
  • Heartbeat combined with checkpoint
  • Zombie reaper finds stale workers
  • Orphaned jobs reclaimed correctly
  • Checkpoint preserved on reclaim
  • Graceful shutdown cleans up
  • Metrics exported
  • Integration test: Kill worker, verify job reclaimed

Files to Create

  • src/distributed/heartbeat.rs
  • src/distributed/reaper.rs

Files to Modify

  • src/distributed/mod.rs
  • src/distributed/worker.rs (integrate heartbeat)
  • src/distributed/checkpoint.rs (combine with heartbeat)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions