Design: Daft Native Kubernetes Support #6639
Replies: 4 comments
-
|
Excellent write up here @chenghuichen thank you! |
Beta Was this translation helpful? Give feedback.
-
|
Thanks @chenghuichen! We are actually working on this actively alongside the shuffle improvements and plan to have a version out initial versions out towards the end of Q2. |
Beta Was this translation helpful? Give feedback.
-
|
@ohbh Great to hear it's already in progress! Curious whether the approach you have in mind aligns with what's outlined here — happy to contribute based on where things are heading. |
Beta Was this translation helpful? Give feedback.
-
|
Hey @chenghuichen, sorry for the slow response. What you have outlined here is very close to what we're thinking. The reason we haven't published a roadmap is because we're still deciding on some of the internal contracts we're using and what will be the surface of the backend. We do intend for Flight to be the replacement for data transfer between query stages instead of Ray refs. We've done some cleanup recently around those abstractions in #6627 (where we removed We also do intend to use gRPC services for the scheduler/worker, but are considering how much "k8s" we'll want to expose in the daft itself, for example, I'll follow up here once we have a clearer path outlined, thanks! |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
1. Motivation
Daft's distributed architecture (Flotilla) exposes
WorkerandWorkerManageras pluggable interfaces, providing the foundation for running distributed Daft jobs directly on Kubernetes. Users no longer need to introduce Ray or KubeRay — they can submit Daft jobs on existing K8s platforms (Argo Workflows, Airflow KubernetesPodOperator, etc.) without any additional layers.2. Current State
2.1 Reusable As-Is
CancellationToken-based cancellation; backend-agnostic2.2 New Components Required
Workertrait; holds a gRPC channel (control plane) and a Flight channel (data plane) both pointing to the same Worker PodWorkerManagertrait; manages Worker Pod lifecycle via kube-rs (create, delete, Watch+Poll state tracking); includes built-in autoscaling logicTaskResultHandletrait; async chain: gRPCAwaitTask(versioned long-poll) → FlightDoGet(fetch output partitions)do_getfor task output partition tickets (task_id:partition_idx)Runnerinterface; exposes theset_runner_k8s()public API2.3 Relationship to the Shuffle Improvements Design
The Shuffle Improvements Design is planning a generic
Shuffler<ShuffleID>trait and aTaskOutput<T: Shuffler>structure. The K8s runner's data plane should integrate with this system rather than maintaining a separate partition reference mechanism.Concretely: the K8s runner aligns its task output references with the existing
FlightShufflePartitionRef(which containsserver_address,shuffle_id, andpartition_idx), going through theTaskOutput<FlightShuffler>path without introducing new types. This means the scheduler'sDispatcherrequires no K8s-specific handling.3. Architecture
3.1 Component Diagram
3.2 Usage
The simplest path is the Python API, suitable for development and debugging:
For production and CI, the recommended path is to submit the Driver as a K8s Job via the Helm Chart:
The Helm Chart creates the ServiceAccount and RBAC (granting the Driver Pod permission to manage Worker Pods) and submits the Driver Job. Once the Driver Job starts,
K8sRunnerinside the Driver Pod takes over Worker Pod lifecycle management.For Argo Workflows or Airflow KubernetesPodOperator, wrap the Driver script as a workflow step or operator task. The integration cost is identical to a regular K8s Job — no additional configuration required.
4. Key Design Decisions
4.1 gRPC for Control Plane Only; Flight for All Data
gRPC (:7777) carries control signals only — no data:
SubmitTask: submit a task, fire-and-forget, returns task ID immediatelyAwaitTask: wait for task completion, versioned long-poll, returns a Flight ticket (no partition data)CancelTask,DropPartitions,Check(Health), Actor teardownArrow Flight (:7778) handles all data traffic:
DoGet(task_id:partition_idx)DoPut/DoGet(existing logic, no changes needed)Task output is
MicroPartition(Arrow data). Sending it over gRPC bytes introduces double encoding (Arrow IPC → protobuf bytes → gRPC frame). Flight's FlightData carries Arrow IPC buffers directly with zero extra wrapping, and Workers already run a Flight server — no new infrastructure needed.4.2 AwaitTask: Versioned Long-Polling
SubmitTaskis fire-and-forget and returns immediately.AwaitTaskuses versioned long-polling: each poll blocks for at most 2s (randomized to prevent thundering herd); if there is no state change the Worker returns the current state and the Coordinator re-polls. The response carries a Flight ticket, not the actual data.This is more robust than holding a long-lived gRPC stream — after a network blip the Coordinator simply retries the request with no need to distinguish first connection from reconnection.
4.3 StatefulSet Mode
In direct Pod mode, a Pod's IP changes on restart. For workloads using the local disk shuffle backend, shuffle metadata contains the Worker's Flight
server_address; an IP change invalidates that metadata. StatefulSet mode solves this by providing stable DNS names — an approach borrowed from Spark on Kubernetes'sStatefulSetPodsAllocator, which was introduced for the same reason.With respect to shuffle backend: local disk shuffle calls for StatefulSet mode; when using a remote shuffle service, partition refs point to an external service endpoint that is inherently stable, so direct Pod mode suffices.
Pod restart detection (Incarnation UUID): in StatefulSet mode the Pod DNS stays constant, but the process can restart. Each time a Worker process starts it generates a new random UUID (the instance_id field), which is included in every
AwaitTaskresponse. When the Coordinator detects a UUID change it immediately triggersWorkerDiedfor all in-flight tasks on that Worker and reschedules them, avoiding a deadlock where the Coordinator waits forever for a task that no longer exists.4.4 Bidirectional Failure Detection
Coordinator detects Worker failure: Watch/Poll observes a Pod entering Failed/OOMKilled/Evicted state, triggers
WorkerDied, and the existing scheduling retry path takes over.Worker detects Coordinator unreachable: during a network partition the Coordinator may be unable to communicate with Workers for an extended period while GPU tasks continue running. If no new
AwaitTaskrequest arrives within a configurable timeout (default 120s), the Worker proactively cleans up orphaned tasks and releases GPU and other scarce resources.5. Work Breakdown
Phase 1: Core Functionality (MVP)
Goal: users can run
daft.set_runner_k8s()on a kind cluster to execute TPC-H Q1 (including shuffle), with Worker Pods created and destroyed correctly and failed tasks rescheduled.P1-1 Worker Control Plane gRPC Server
Implement a gRPC server inside the Worker binary (control plane, :7777) with the following RPCs: submit task (fire-and-forget), await task completion (versioned long-poll, returns Flight ticket), cancel task, partition GC notification, and health check (standard gRPC Health Checking Protocol,
SERVING/NOT_SERVING).The Worker starts in
NOT_SERVINGstate and transitions toSERVINGonceNativeExecutorfinishes initializing. The K8s readinessProbe uses this to gate traffic.P1-2 Worker Data Plane Flight Server Extension
Extend the existing Flight server (:7778) to support two ticket types in
do_get: task output partitions (task_id:partition_idx), which retrieve the correspondingMicroPartitionfrom the local partition cache and return it as an Arrow IPC stream; and cross-node partitions, also served from local cache for other Workers to pull as inputs.Partition cache management aligns with the
Shufflertrait from the Shuffle Improvements Design; partition references reuse theFlightShufflePartitionRefstructure.P1-3
K8sWorker+K8sTaskResultHandle(Rust)Implement the
Workertrait (id, CPU/GPU resource reporting, active task details) and theTaskResultHandletrait (get_resultasync chain: gRPCAwaitTaskto obtain a ticket, then FlightDoGetto fetch data;cancel_callbacksends gRPCCancelTask).K8sWorkerholds one gRPC channel and one Flight channel, both pointing to the same Pod on different ports.P1-4
K8sWorkerManager(Rust)Implement the
WorkerManagertrait:submit_tasks_to_workers: synchronous; issues gRPCSubmitTaskviatokio::runtime::Handle::block_onand returns the handle list immediately.worker_snapshots: triggers a Pod state refresh and returns the resource snapshots needed by the scheduler.mark_worker_died: removes the worker from the pool.try_autoscale: creates new Worker Pods and deletes idle ones.shutdown: deletes all Worker Pods.Pod state tracking: Watch (incremental, real-time) + Poll (full snapshot, every 30s) run in parallel. Poll provides eventual-consistency as a safety net; missing pod detection threshold is 60s.
P1-5 Pod Lifecycle Management
Worker Pod spec includes: CPU/GPU/memory requests=limits (Guaranteed QoS), an emptyDir volume mounted as the shuffle directory, a readinessProbe (gRPC Health Check), a livenessProbe, and a
daft.io/job-idlabel for filtering.All created resources set
ownerReferencespointing to the Driver Pod; K8s GC cascades deletion of all child resources when the Driver Pod is removed. This means that if the Driver restarts (e.g. K8s Job backoff retry), the job starts from scratch — resuming an in-progress job is not supported.Built-in autoscaling: scale-out triggers when pending tasks exceed current capacity × 1.25, capped at 10 new Pods per round and a global limit of 50 pending Pods. Workers idle for more than 120s are scaled in.
P1-6 Python API
K8sRunnerimplements theRunnerinterface (run, run_iter, runner_io). The public API isset_runner_k8s(), with core parameters: namespace, image, num_workers, worker_cpu, worker_memory, worker_gpu, service_account, node_selector, tolerations. Environment variable configuration viaDAFT_RUNNER=k8sand the correspondingDAFT_K8S_*family.P1-7 Container Image and Helm Chart (Basic)
Official Daft Worker base image (
ghcr.io/eventual-inc/daft:<version>) containing the Python runtime, daft installation, and Worker binary entrypoint. A GPU variant (daft:<version>-cu121) is based on the nvidia/cuda base image. Built by GitHub Actions, tied to Release tags, with amd64 + arm64 multi-arch support.New
k8s/charts/daft/Helm chart supporting Job mode (Driver Pod as a K8s Job). Core templates: ServiceAccount, RBAC (Pod management permissions), Driver Job, values.yaml.P1-8 Integration Tests
Spin up a kind cluster (3 nodes) in CI, build and load the daft-worker image into the cluster, and run basic integration tests: filter/collect, groupby/agg (with shuffle), and Worker failure rescheduling.
Phase 2: Production-Ready
Goal: support enterprise production deployments covering GPU workloads, Actor UDFs, security compliance, and observability.
P2-1 Actor UDF Lifecycle (GPU Model Loading)
On Ray, Actor UDFs map directly onto Ray's native Actor abstraction and sticky routing is handled automatically. Kubernetes has no equivalent primitive, so the affinity must be enforced explicitly at the scheduling layer.
GPU inference UDFs (e.g. vLLM, HuggingFace models) load model weights into the Worker process once and reuse them across subsequent calls. This means tasks of this type must always be routed to the same Worker Pod — they cannot float across arbitrary Workers the way stateless tasks do.
To support this, we distinguish two Pod types — Stateless Workers and Actor Workers. Actor Workers use
WorkerAffinityscheduling to ensure tasks are always routed to the same Pod, are assigned a higher PriorityClass to reduce eviction risk, and are protected by a PodDisruptionBudget.Graceful Actor teardown: the Coordinator notifies the Worker via gRPC to finish its current task and then exit; the Worker releases GPU memory; after a timeout the Coordinator force-deletes the Pod.
P2-2 GPU Isolation
The K8s NVIDIA device plugin sets
CUDA_VISIBLE_DEVICESto device UUID format after allocating a GPU. The Worker binary checks whether this variable has already been set by the device plugin at startup and skips any override if so (the current Ray implementation uses ordinal format and overrides unconditionally, which causes incorrect device mapping).MIG partitioning, time-slicing, and similar features are configurable via the gpu_resource_name parameter.
P2-3 OOM Handling and Recovery Mode
When Watch/Poll detects an OOMKilled Pod, it triggers
WorkerDiedand the existing rescheduling path takes over.After N consecutive Worker OOMKills (configurable, default 2), the runner enters recovery mode: new Worker Pods are created with reduced concurrency to lower the memory peak. A
daft_worker_oomkilled_totalmetric is recorded and a recommendation to increase worker_memory is printed to the logs.P2-4 StatefulSet Allocation Mode (inspired by Spark's
StatefulSetPodsAllocator)Implement an optional StatefulSet allocator: scale via the replicas field, backed by a Headless Service for stable DNS, with Flight server addresses using DNS instead of Pod IP (see §4.3). Applicable when the shuffle backend is local disk. Helm chart gains a
workers.allocationMode: statefulsetconfiguration option.P2-5 Dependency Distribution
Three-stage path from development to production: local path (Driver packages
working_diras a zip, uploads to S3/GCS, Worker init containers download and extract it; extra pip packages installed via init container); remote URI (working_dir="s3://my-bucket/deps/bundle.zip"for pre-uploaded bundles in CI); andlocal://URI (working_dir="local:///app/my_project"for files already baked into the image — the Worker adds the path tosys.pathwith no upload or download). Custom images are the recommended production path; the above mechanisms are primarily for development iteration.P2-6 Helm Chart Hardening
Add NetworkPolicy templates (Worker-to-Worker Flight traffic, Coordinator→Worker gRPC, Worker→external storage), optional HPA, ServiceMonitor (Prometheus Operator), Grafana Dashboard ConfigMap, multi-architecture GPU node support (nodeSelector + tolerations).
P2-7 Observability
Workers expose Prometheus metrics: task throughput (submitted/completed/failed), task latency P50/P99, shuffle read/write bytes, Worker CPU/memory utilization, OOMKill count.
Structured JSON logs to stdout with job_id/query_id/task_id/worker_id fields for per-query correlation in Loki/ELK. The Driver prints the corresponding kubectl log command when submitting a query to lower the debugging barrier.
P2-8 Spot/Preemptible Node Support
Distinguish two exit signals: SIGTERM (K8s eviction/forced termination, fast drain in 5s) and SIGUSR1 (Coordinator-initiated scale-down, graceful drain). The
preStoplifecycle hook sends SIGUSR1, giving the Coordinator enough time to complete a graceful drain when it initiates the shutdown.P2-9 Cloud Compatibility Testing
Full test matrix: EKS / GKE / AKS × K8s 1.28 / 1.29, validating Arrow Flight P2P connectivity and throughput under major CNIs (VPC CNI, Calico, Cilium).
Phase 3: Advanced / Long-Term
The following are not blockers for production use but provide value for specific scenarios; scheduling depends on community feedback.
6. Key Interface Definitions
6.1 Rust Traits (no changes needed; K8s implementations satisfy these as-is)
6.2 gRPC Control Plane Proto (control signals only, no partition bytes)
6.3 Python API
7. Open Questions
O1 — Default behavior for dependency distribution
When a user calls
set_runner_k8s()without specifyingworking_dir, how does local driver-side code (e.g. UDF function bodies) reach the Workers? Options: do nothing and rely on the user's custom image (simplest, but poor development experience); auto-detectsys.pathand package and upload it (convenient, but opaque behavior); or require the user to specify it explicitly (safe, but adds friction).O2 — Is the Stateless/Actor Worker type split the right model?
P2-1 addresses GPU model persistence by introducing two Pod types — Stateless Workers and Actor Workers — but this adds type complexity to the scheduling layer (
WorkerAffinity, differentiated PriorityClass, different teardown semantics). Is there a simpler alternative — for example, having all Workers support model caching while the Coordinator maintains task-to-worker affinity purely at the scheduling layer, without distinguishing Pod types? This deserves thorough discussion before implementation begins.Beta Was this translation helpful? Give feedback.
All reactions