From 2477fa3fc253fe6faa362c47317460a7a6950b47 Mon Sep 17 00:00:00 2001 From: Haiyang Sun Date: Sun, 3 May 2026 16:12:48 +0000 Subject: [PATCH 1/5] grpc udf protocol --- .../proto/src/main/protobuf/common.proto | 4 +- .../src/main/protobuf/udf_protocol.proto | 459 ++++++++++++++++++ 2 files changed, 461 insertions(+), 2 deletions(-) create mode 100644 udf/worker/proto/src/main/protobuf/udf_protocol.proto diff --git a/udf/worker/proto/src/main/protobuf/common.proto b/udf/worker/proto/src/main/protobuf/common.proto index ee032def73efe..7028b13571874 100644 --- a/udf/worker/proto/src/main/protobuf/common.proto +++ b/udf/worker/proto/src/main/protobuf/common.proto @@ -26,7 +26,7 @@ option java_multiple_files = true; // The UDF in & output data format. enum UDFWorkerDataFormat { UDF_WORKER_DATA_FORMAT_UNSPECIFIED = 0; - + // The worker accepts and produces Apache arrow batches. ARROW = 1; } @@ -42,7 +42,7 @@ enum UDFWorkerDataFormat { enum UDFProtoCommunicationPattern { UDF_PROTO_COMMUNICATION_PATTERN_UNSPECIFIED = 0; - // Data exachanged as a bidrectional + // Data exchanged as a bidirectional // stream of bytes. BIDIRECTIONAL_STREAMING = 1; } diff --git a/udf/worker/proto/src/main/protobuf/udf_protocol.proto b/udf/worker/proto/src/main/protobuf/udf_protocol.proto new file mode 100644 index 0000000000000..a4d997c966752 --- /dev/null +++ b/udf/worker/proto/src/main/protobuf/udf_protocol.proto @@ -0,0 +1,459 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +import "common.proto"; + +package org.apache.spark.udf.worker; + +option java_package = "org.apache.spark.udf.worker"; +option java_multiple_files = true; + +// ===================================================================== +// Language-agnostic UDF execution protocol. +// +// The Spark engine acts as the gRPC client; a UDF worker (in any +// language) acts as the gRPC server. +// ===================================================================== + +// The default UDF gRPC service. A worker that exposes this service +// MUST do so over the default connection of the worker specification. +// +// In future, additional connections (e.g. a separate channel) may be +// reserved by the worker spec for other purposes. +service Worker { + // Per-execution stream. Exactly one [[Init]] is sent first, followed + // by 0..N data batches in either direction, terminated by exactly + // one [[Finish]] or [[Cancel]] from the engine. The worker MUST + // respond with the matching Init / Finish / Cancel responses on the + // response stream. + // + // For stateful execution, the state is maintained per bi-directional + // stream, mapping to a `WorkerSession` on the engine side + // (`org.apache.spark.udf.worker.core.WorkerSession`). + rpc Execute(stream UdfRequest) returns (stream UdfResponse); + + // Worker-scoped management RPC, independent of any per-execution + // stream. Used for heartbeat, capability query, and graceful + // shutdown. Kept unary so it does not depend on the lifecycle of an + // active Execute stream. + rpc Manage(WorkerRequest) returns (WorkerResponse); +} + +// ===================================================================== +// Execute stream -- envelope +// ===================================================================== + +// Engine -> Worker. Either a control message ([[Init]] / [[PayloadChunk]] +// / [[Finish]] / [[Cancel]]) or a data message. +message UdfRequest { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof request { + UdfControlRequest control = 1; + DataRequest data = 2; + } +} + +// Worker -> Engine. Either a control response ([[InitResponse]] / +// [[FinishResponse]] / [[CancelResponse]]) or a data response message. +message UdfResponse { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof response { + UdfControlResponse control = 1; + DataResponse data = 2; + } +} + +// Engine -> Worker control messages. +// +// Wire order on an Execute stream is exactly: +// Init { ... } +// PayloadChunk { ... }* // optional; 0..N chunks, only used when +// // the single UDF payload on Init is too +// // large to fit inline. +// ( DataRequest | )* +// Finish { ... } OR Cancel { ... } // exactly one terminator +// +// The worker MUST emit [[InitResponse]] before sending any +// [[DataResponse]], and MUST emit exactly one [[FinishResponse]] or +// [[CancelResponse]] before closing the response stream. +// +// A worker that receives messages out of this order (e.g. a second Init, +// a PayloadChunk after the first DataRequest, a DataRequest before Init, +// or a Cancel before Init) MUST close the stream with an error. +message UdfControlRequest { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof control { + Init init = 1; + PayloadChunk payload = 2; + Finish finish = 3; + Cancel cancel = 4; + } +} + +// Worker -> Engine control messages. +message UdfControlResponse { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof control { + InitResponse init = 1; + FinishResponse finish = 2; + CancelResponse cancel = 3; + } +} + +// ===================================================================== +// Init phase +// ===================================================================== + +// Sent once, as the first message on an Execute stream. Describes +// the UDF body to run plus the minimum metadata the worker needs to +// start processing it. +// +// Today the protocol mandates exactly one Init per UDF execution +// (one Init -> data -> Finish). This is the simplest contract and +// covers all currently supported UDF kinds. In the future we may +// evolve to support multiple init phases on the same stream -- e.g. +// when worker setup requires an interactive handshake (negotiate a +// schema, exchange capabilities, fetch driver-side metadata, ...) +// before the data plane opens. Such an extension would be additive +// and would not change the single-Init semantics already in use. +// +// Engine vs. client split: +// * Most fields on Init are engine-side. They describe what +// flows on the wire for this session ([[data_format]] / +// [[input_schema]] / [[output_schema]] -- matching the worker +// spec, not the function's view) and what per-session +// context the worker needs ([[timezone]], [[session_conf]], +// [[task_context]], [[parameters]]). +// * [[UdfPayload]] carries everything the client side of Spark +// (where the UDF is defined and serialized) packs -- the +// callable bytes themselves, plus optional custom encoders +// that override the worker's built-in decoders only when the +// UDF deals in types the worker doesn't already know how to +// convert (e.g. recovering Arrow batches into client-provided +// Scala case classes or other user-defined types). +message Init { + // (Required) Wire format used for [[DataRequest.data]] and + // [[DataResponse.data]] for the life of this session. Must be + // one of the formats the worker declared in + // [[WorkerCapabilities.supported_data_formats]]; the client side + // of the protocol picks one at planning time and sticks with it. + // Receivers MUST reject an Init whose [[data_format]] is + // `UDF_WORKER_DATA_FORMAT_UNSPECIFIED`. + UDFWorkerDataFormat data_format = 1; + + // (Required) The UDF body to execute on the worker for this + // session. Exactly one payload per Execute stream. + UdfPayload udf = 2; + + // (Optional) Schema of the input data plane in the wire format + // declared by [[data_format]] -- e.g. an Arrow IPC schema when + // data_format = ARROW. This is an engine-side requirement: it + // describes the bytes the engine will actually put on + // [[DataRequest.data]] for this session, matching what the + // worker advertised in its spec. It is NOT necessarily the + // schema the function definer expressed; the UDF's own type + // information lives inside [[UdfPayload]] (embedded in the + // payload, or as a structured [[UdfPayload.input_encoder]] + // that converts wire bytes into language-native values). + // + // Left unset when the worker can derive the schema from the + // payload alone. + optional bytes input_schema = 3; + + // (Optional) Schema of the output data plane in the wire format + // declared by [[data_format]]. Same semantics as + // [[input_schema]] -- engine-side requirement describing the + // bytes the engine expects on [[DataResponse.data]]. + optional bytes output_schema = 4; + + // (Optional; defaults to an empty map.) Per-task context + // provided by the engine. Common keys identify the task instance + // for diagnostics, logging, and stateful workers -- e.g. + // partition id, task attempt id, stage id, micro-batch id. + // Engine and worker agree on the keys they share; the protocol + // does not enumerate them. + map task_context = 5; + + // (Optional; defaults to an empty map.) Worker-private knobs not + // already captured by typed fields above. Free-form; both sides + // agree on the keys they need. + // + // Any key that two languages converge on is a candidate for + // promotion to a structured proto field -- once promoted, it gets + // a typed field number from the reserved range right after this + // block and is removed from [[session_conf]]. [[timezone]] below + // is an example of a key that has already been promoted. + map session_conf = 6; + + // (Optional) Session timezone, promoted out of [[session_conf]] + // because every eval needs it for timestamp encoding/decoding. + optional string timezone = 7; + + // Reserved for future typed Init fields, in particular keys + // graduated from [[session_conf]] (see the [[timezone]] precedent + // above). Numbers >= 100 are intentionally NOT reserved here; if + // a future revision needs an opaque escape-hatch field, give it a + // number >= 100 alongside [[parameters]] and add a field-level + // comment so the convention stays visible. + reserved 8 to 99; + + // (Optional) Engine-packed opaque parameters specific to a + // particular kind of UDF execution. The escape hatch for + // anything the engine needs the worker to see at init time + // that is not already captured by the typed fields above and + // does not fit naturally into [[task_context]]. The encoding + // is agreed between the engine and the worker; the protocol + // does not interpret it. The matching response, also opaque + // bytes, is returned via [[InitResponse.data]]. + // + // Numbers >= 100 are reserved by convention for opaque + // escape-hatch fields like this one; new typed fields use the + // reserved 8..99 range. + // + // Client-side init data (anything packed by the layer that + // defines and serializes the UDF) does NOT belong here -- it + // travels inside [[UdfPayload.payload]] instead. + optional bytes parameters = 100; +} + +// Acknowledgment for [[Init]]. The worker MUST send exactly one +// [[InitResponse]] before any [[DataResponse]]. +// +// The init phase allows the engine to interact with the UDF before +// data starts flowing -- the worker can return inline bytes here for +// the engine (or higher-level code on the engine side) to consume +// during setup. The semantics of those bytes are agreed between the +// client side of the protocol and the worker; this message itself is +// otherwise opaque. +message InitResponse { + // (Optional) Inline init result returned by the worker. Opaque + // to the protocol; the client side of the protocol and the + // worker agree on what (if anything) it carries. + optional bytes data = 1; +} + +// Optional. Used to stream the single UDF payload when it does not +// fit in a single gRPC message. The default is to send the payload +// inline on [[UdfPayload.payload]]; chunking is only needed when a +// payload exceeds the gRPC message size limit. +// +// When used, chunks are sent zero or more times after [[Init]] and +// before the first [[DataRequest]]. The worker concatenates the +// inline [[UdfPayload.payload]] (if any) followed by all chunks in +// arrival order to form the final payload. +message PayloadChunk { + // (Required, non-empty.) Bytes appended to the [[Init.udf]] + // payload. + bytes data = 1; + + // (Optional) Set to true on the final chunk. Receivers MAY use + // this as an early signal that the payload is complete and + // decoding can begin; receivers that prefer to wait for the + // first [[DataRequest]] (which marks the end of the chunking + // phase) MAY ignore this. When unset, the receiver determines + // completeness by the arrival of the first [[DataRequest]]. + optional bool last = 2; +} + +// ===================================================================== +// Data phase +// +// `data` is intentionally a top-level `bytes` field on both request +// and response messages -- not nested inside a wrapper -- so that +// implementations can avoid an extra copy when reading or writing +// the payload. The wire format (Arrow IPC etc.) is declared once per +// session via [[Init.data_format]] and stays the same for the life +// of the stream. +// ===================================================================== + +// Engine -> Worker per-batch payload. +message DataRequest { + // (Required, non-empty.) Encoded data bytes for one batch in the + // session-declared format. + bytes data = 1; +} + +// Worker -> Engine per-batch payload. The worker emits zero or more +// [[DataResponse]]s between [[InitResponse]] and [[FinishResponse]] / +// [[CancelResponse]]. Sink-style UDFs (which consume input but +// produce no output rows on the data plane) emit exactly zero. +message DataResponse { + // (Required, non-empty.) Encoded data bytes for one batch in the + // session-declared format. + bytes data = 1; +} + +// ===================================================================== +// Finish / Cancel phase +// ===================================================================== + +// Sent by the engine when no more input batches will arrive. The +// worker MUST drain any remaining output, then emit +// [[FinishResponse]] and close the response stream. +// +// Exactly one of [[Finish]] or [[Cancel]] is sent per Execute stream; +// they are mutually exclusive. If the engine has already sent +// [[Finish]] it MUST NOT send [[Cancel]] afterwards (and vice versa). +message Finish {} + +// Worker -> Engine completion message. May carry summary metrics. +message FinishResponse { + // Final metrics aggregated over the whole session (e.g. rows + // in/out, time per phase). Free-form; names are worker-defined. + map metrics = 1; + + // (Optional) Inline finish result returned by the worker. + // Mirrors [[InitResponse.data]] -- the finish phase allows the + // engine to interact with the UDF after data has stopped + // flowing, with the worker returning opaque bytes the engine (or + // higher-level code) may consume during teardown. The semantics + // of those bytes are agreed between the client side of the + // protocol and the worker. + optional bytes data = 2; +} + +// Engine -> Worker explicit cancel. Distinct from a gRPC stream error +// so the worker can run cleanup deterministically (release file +// handles, drop temp state, etc.). After receiving [[Cancel]] the +// worker MUST stop emitting [[DataResponse]] messages, run cleanup, +// and emit [[CancelResponse]] before closing. +// +// Exactly one of [[Finish]] or [[Cancel]] is sent per Execute stream; +// see [[Finish]]. [[Cancel]] is the cooperative cancellation path; +// gRPC-level stream errors are the involuntary fallback. If the +// stream breaks before [[CancelResponse]] arrives, the engine +// considers the worker uncancellable for this session and relies on +// process-level cleanup. +message Cancel { + // (Optional) Free-form reason for diagnostics. + optional string reason = 1; +} + +// Worker -> Engine acknowledgment of [[Cancel]]. +message CancelResponse {} + +// The single UDF body delivered to the worker on [[Init]]. Opaque to +// the engine: the engine forwards [[payload]] and [[format]] +// unchanged, and the worker decodes them per the format the client +// and worker have agreed on. +message UdfPayload { + // (Required, may be empty when chunked.) Serialized callable + // bundle, opaque to the engine. The encoding is declared in + // [[format]]. + // + // For payloads too large to fit on a single gRPC message, this + // field MAY be left empty (zero-length bytes) and the bytes + // delivered via the [[PayloadChunk]] mechanism instead. See + // [[PayloadChunk]] for chunking semantics. + bytes payload = 1; + + // (Required, non-empty.) Format tag identifying the encoding of + // [[payload]] (e.g. "py-cloudpickle-v3", "wasm-v1"). Engine does + // not interpret this; the client side of the protocol and the + // worker agree on its meaning. + string format = 2; + + // (Optional) Total payload size in bytes. Useful when chunked + // streaming is used so the worker can pre-allocate buffers. + optional int64 payload_size = 3; + + // (Optional) Human-readable name for diagnostics and metrics. + optional string name = 4; + + // (Optional) Worker / language-specific dispatch hint. A + // free-form string the worker uses to pick the code path that + // handles this payload. The protocol does not enumerate eval + // types because they are language-specific; the client side of + // the protocol and the worker agree on the namespace and the + // values. + // + // When the worker can derive the eval type from the payload + // itself (embedded metadata, format tag, etc.), this field is + // left unset. Otherwise the client side of the protocol sets it + // explicitly. + optional string eval_type = 5; + + // (Optional) Custom input encoder bytes. The worker already + // ships with built-in decoders for its standard types (e.g. a + // Python worker turns Arrow batches into pandas / pyarrow + // values out of the box; a JVM worker has its own defaults). + // Set this field only when the UDF needs a conversion the + // worker doesn't know about -- for example, recovering Arrow + // batches into client-provided Scala case classes, or any + // other user-defined type the function definer requires. + // + // Packed by the client side of the protocol; opaque to the + // wire protocol. Left unset whenever the worker's built-in + // decoders are sufficient. + optional bytes input_encoder = 6; + + // (Optional) Custom output encoder bytes. Mirror of + // [[input_encoder]]: set only when the UDF produces values the + // worker cannot convert to [[DataResponse.data]] using its + // built-in encoders, and the client side of the protocol needs + // to ship the conversion alongside the UDF. + optional bytes output_encoder = 7; +} + +// ===================================================================== +// Manage RPC -- worker-scoped operations independent of Execute +// ===================================================================== + +// Engine -> Worker. Wraps the manage operations in a oneof so the RPC +// is a single typed call, leaving room for future operations +// (capability query, profiling, ...). +message WorkerRequest { + // Exactly one branch MUST be set; receivers MUST reject messages + // with no branch set. + oneof manage { + Heartbeat heartbeat = 1; + ShutdownRequest shutdown = 2; + } +} + +// Worker -> Engine. +message WorkerResponse { + // Exactly one branch MUST be set, mirroring the request oneof. + oneof manage { + HeartbeatAck heartbeat = 1; + ShutdownResponse shutdown = 2; + } +} + +// Liveness probe. The engine may send this periodically to detect a +// hung worker. The worker SHOULD reply within a small bounded time. +message Heartbeat {} + +// Acknowledgment for [[Heartbeat]]. +message HeartbeatAck {} + +// Engine-initiated graceful shutdown request. Independent of SIGTERM +// (which is the OS-level fallback) -- this lets the worker know the +// engine has finished with it and intends no further Execute streams. +message ShutdownRequest { + // (Optional) Free-form reason for diagnostics. + optional string reason = 1; +} + +// Worker -> Engine acknowledgment of [[ShutdownRequest]]. +message ShutdownResponse {} From 0736d42f5cbf22d032cfd489617022cdcc8ac450 Mon Sep 17 00:00:00 2001 From: Haiyang Sun Date: Mon, 4 May 2026 07:53:04 +0000 Subject: [PATCH 2/5] update README, remove InitMessage place holder. --- udf/worker/README.md | 27 +++++++----- .../spark/udf/worker/core/WorkerSession.scala | 41 +++++-------------- .../core/DirectWorkerDispatcherSuite.scala | 4 +- 3 files changed, 30 insertions(+), 42 deletions(-) diff --git a/udf/worker/README.md b/udf/worker/README.md index b843c430d0e04..707005721c693 100644 --- a/udf/worker/README.md +++ b/udf/worker/README.md @@ -19,7 +19,7 @@ WorkerDispatcher -- manages workers, creates sessions | v WorkerSession -- one UDF execution - | 1. session.init(InitMessage(payload, inputSchema, outputSchema)) + | 1. session.init(Init proto: udf payload + data format + schemas) | 2. val results = session.process(inputBatches) | 3. session.close() ``` @@ -34,12 +34,13 @@ provisioning service or daemon). ``` udf/worker/ ├── proto/ -│ worker_spec.proto -- UDFWorkerSpecification protobuf (+ generated Java classes) +│ worker_spec.proto -- UDFWorkerSpecification protobuf +│ udf_protocol.proto -- UDF execution protocol (Init, UdfPayload, ...) │ common.proto -- shared enums (UDFWorkerDataFormat, etc.) │ └── core/ -- abstract interfaces WorkerDispatcher.scala -- creates sessions, manages worker lifecycle - WorkerSession.scala -- per-UDF init/process/cancel/close + InitMessage + WorkerSession.scala -- per-UDF init/process/cancel/close WorkerConnection.scala -- transport channel abstraction WorkerSecurityScope.scala -- security boundary for worker pooling │ @@ -76,10 +77,12 @@ Workers are terminated via SIGTERM/SIGKILL when the dispatcher is closed. ```scala import org.apache.spark.udf.worker.{ - DirectWorker, ProcessCallable, UDFProtoCommunicationPattern, - UDFWorkerDataFormat, UDFWorkerProperties, UDFWorkerSpecification, - UnixDomainSocket, WorkerCapabilities, WorkerConnectionSpec, WorkerEnvironment} + DirectWorker, Init, ProcessCallable, UdfPayload, + UDFProtoCommunicationPattern, UDFWorkerDataFormat, UDFWorkerProperties, + UDFWorkerSpecification, UnixDomainSocket, WorkerCapabilities, + WorkerConnectionSpec, WorkerEnvironment} import org.apache.spark.udf.worker.core._ +import com.google.protobuf.ByteString // 1. Define a worker spec (direct creation mode). val spec = UDFWorkerSpecification.newBuilder() @@ -112,10 +115,14 @@ val dispatcher: WorkerDispatcher = ... val session = dispatcher.createSession(securityScope = None) try { // 4. Initialize with the serialized function and schemas. - session.init(InitMessage( - functionPayload = serializedFunction, - inputSchema = arrowInputSchema, - outputSchema = arrowOutputSchema)) + session.init(Init.newBuilder() + .setUdf(UdfPayload.newBuilder() + .setPayload(ByteString.copyFrom(serializedFunction)) + .setFormat("py-cloudpickle-v3")) + .setDataFormat(UDFWorkerDataFormat.ARROW) + .setInputSchema(ByteString.copyFrom(arrowInputSchema)) + .setOutputSchema(ByteString.copyFrom(arrowOutputSchema)) + .build()) // 5. Process data -- Iterator in, Iterator out. val results: Iterator[Array[Byte]] = diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala index f4c4091688c94..f122dbd6c0d21 100644 --- a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala @@ -19,31 +19,7 @@ package org.apache.spark.udf.worker.core import java.util.concurrent.atomic.AtomicBoolean import org.apache.spark.annotation.Experimental - -/** - * :: Experimental :: - * Carries all information needed to initialize a UDF execution on a worker. - * - * This message is passed to [[WorkerSession#init]] and contains the function - * definition, schemas, and any additional configuration. - * - * Placeholder: will be replaced by a generated proto message once the - * UDF wire protocol lands. Do not rely on case-class equality -- - * `Array[Byte]` fields compare by reference. - * - * @param functionPayload serialized function (e.g., pickled Python, JVM bytes) - * @param inputSchema serialized input schema (e.g., Arrow schema bytes) - * @param outputSchema serialized output schema (e.g., Arrow schema bytes) - * @param properties additional key-value configuration. Can carry - * protocol-specific or engine-specific metadata that - * does not yet have a dedicated field. - */ -@Experimental -case class InitMessage( - functionPayload: Array[Byte], - inputSchema: Array[Byte], - outputSchema: Array[Byte], - properties: Map[String, String] = Map.empty) +import org.apache.spark.udf.worker.Init /** * :: Experimental :: @@ -62,7 +38,10 @@ case class InitMessage( * {{{ * val session = dispatcher.createSession(securityScope = None) * try { - * session.init(InitMessage(functionPayload, inputSchema, outputSchema)) + * session.init(Init.newBuilder() + * .setUdf(UdfPayload.newBuilder().setPayload(callable).setFormat(fmt)) + * .setDataFormat(UDFWorkerDataFormat.ARROW) + * .build()) * val results = session.process(inputBatches) * results.foreach(handleBatch) * } finally { @@ -93,10 +72,12 @@ abstract class WorkerSession extends AutoCloseable { * * Throws `IllegalStateException` if called more than once. * - * @param message the initialization parameters including the serialized - * function, input/output schemas, and configuration. + * @param message the [[Init]] proto carrying the UDF body, the wire + * data format, optional input/output schemas, and any + * engine-side session context the worker needs to start + * processing. */ - final def init(message: InitMessage): Unit = { + final def init(message: Init): Unit = { if (!initialized.compareAndSet(false, true)) { throw new IllegalStateException("init has already been called on this session") } @@ -128,7 +109,7 @@ abstract class WorkerSession extends AutoCloseable { } /** Subclass hook for [[init]]. Called once, after the guard. */ - protected def doInit(message: InitMessage): Unit + protected def doInit(message: Init): Unit /** Subclass hook for [[process]]. Called at most once, after the guard. */ protected def doProcess(input: Iterator[Array[Byte]]): Iterator[Array[Byte]] diff --git a/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/DirectWorkerDispatcherSuite.scala b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/DirectWorkerDispatcherSuite.scala index 60f5e2211b702..43444ed89b9c6 100644 --- a/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/DirectWorkerDispatcherSuite.scala +++ b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/DirectWorkerDispatcherSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterEach import org.scalatest.funsuite.AnyFunSuite import org.apache.spark.udf.worker.{ - DirectWorker, LocalTcpConnection, ProcessCallable, UDFWorkerProperties, + DirectWorker, Init, LocalTcpConnection, ProcessCallable, UDFWorkerProperties, UDFWorkerSpecification, UnixDomainSocket, WorkerConnectionSpec, WorkerEnvironment} import org.apache.spark.udf.worker.core.direct.{DirectUnixSocketWorkerDispatcher, @@ -58,7 +58,7 @@ class SocketFileConnection(socketPath: String) class StubWorkerSession( workerProcess: DirectWorkerProcess) extends DirectWorkerSession(workerProcess) { - override protected def doInit(message: InitMessage): Unit = {} + override protected def doInit(message: Init): Unit = {} override protected def doProcess( input: Iterator[Array[Byte]]): Iterator[Array[Byte]] = From 973b037792420c4be0b7fc3bdd77ab6b7924aa7b Mon Sep 17 00:00:00 2001 From: Haiyang Sun Date: Tue, 5 May 2026 08:22:56 +0000 Subject: [PATCH 3/5] address comments --- .../spark/udf/worker/core/WorkerSession.scala | 37 +++++++++-- .../src/main/protobuf/udf_protocol.proto | 65 +++++++++---------- 2 files changed, 63 insertions(+), 39 deletions(-) diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala index f122dbd6c0d21..f34c15459eb34 100644 --- a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala @@ -53,7 +53,18 @@ import org.apache.spark.udf.worker.Init * - [[init]] must be called exactly once before [[process]]. * - [[process]] must be called at most once per session. * - [[close]] must always be called (use try-finally). - * - [[cancel]] may be called at any time to abort execution. + * - [[cancel]] may be called at any time, including before [[init]] + * or after [[process]]/[[close]] has returned. Implementations + * treat such calls as a no-op so that callers driven by a task + * interruption listener (which has no view into the session state) + * do not need to coordinate with the thread driving [[process]]. + * + * Cancel-vs-finish race: when the session driver has finished + * sending input (and therefore queued an implicit finish on the + * underlying transport) and a [[cancel]] arrives concurrently, both + * are valid stream-terminating actions; the response side carries + * either a `FinishResponse` or a `CancelResponse` depending on which + * the worker observes first, and either is acceptable to the caller. * * The lifecycle is enforced here: [[init]] and [[process]] are `final` * and delegate to [[doInit]] / [[doProcess]] after AtomicBoolean guards. @@ -119,11 +130,29 @@ abstract class WorkerSession extends AutoCloseable { * * '''Thread-safety:''' implementations must allow [[cancel]] to be called * from a thread different from the one driving [[process]] (typically a - * task interruption thread). It may be invoked at any point after - * [[init]] and should be a no-op if execution has already finished. + * task interruption thread). + * + * '''Lifecycle:''' [[cancel]] is idempotent and safe at any point in + * the session's life: + * - before [[init]] -- nothing has been sent on the transport yet, + * so [[cancel]] is a no-op (the session may still be closed + * normally via [[close]]). + * - between [[init]] and [[process]] -- transitions the session + * into a cancelled state; subsequent [[process]] calls observe + * the cancellation. + * - during [[process]] -- aborts the active stream. + * - after [[process]] / [[close]] has returned -- a no-op. + * + * Implementations are responsible for the no-op behavior described + * above so that callers (e.g. task interruption listeners) do not + * need to coordinate with the thread driving [[process]]. */ def cancel(): Unit - /** Closes this session and releases resources. */ + /** + * Closes this session and releases resources. Idempotent; safe to + * call from a `finally` block regardless of whether [[init]], + * [[process]], or [[cancel]] have been invoked. + */ override def close(): Unit } diff --git a/udf/worker/proto/src/main/protobuf/udf_protocol.proto b/udf/worker/proto/src/main/protobuf/udf_protocol.proto index a4d997c966752..c5ed5987a7812 100644 --- a/udf/worker/proto/src/main/protobuf/udf_protocol.proto +++ b/udf/worker/proto/src/main/protobuf/udf_protocol.proto @@ -146,11 +146,10 @@ message UdfControlResponse { // [[task_context]], [[parameters]]). // * [[UdfPayload]] carries everything the client side of Spark // (where the UDF is defined and serialized) packs -- the -// callable bytes themselves, plus optional custom encoders -// that override the worker's built-in decoders only when the -// UDF deals in types the worker doesn't already know how to -// convert (e.g. recovering Arrow batches into client-provided -// Scala case classes or other user-defined types). +// serialized callable, an opaque format tag, and any encoder +// metadata bundled with the callable. The wire protocol does +// not enumerate encoder shapes; that is left to the client and +// worker to agree on per UDF type. message Init { // (Required) Wire format used for [[DataRequest.data]] and // [[DataResponse.data]] for the life of this session. Must be @@ -172,9 +171,9 @@ message Init { // [[DataRequest.data]] for this session, matching what the // worker advertised in its spec. It is NOT necessarily the // schema the function definer expressed; the UDF's own type - // information lives inside [[UdfPayload]] (embedded in the - // payload, or as a structured [[UdfPayload.input_encoder]] - // that converts wire bytes into language-native values). + // information lives inside [[UdfPayload]], typically embedded + // alongside the callable in [[UdfPayload.payload]] (e.g. as + // input/output encoders chosen per UDF type). // // Left unset when the worker can derive the schema from the // payload alone. @@ -207,6 +206,13 @@ message Init { // (Optional) Session timezone, promoted out of [[session_conf]] // because every eval needs it for timestamp encoding/decoding. + // + // Format follows Spark's `spark.sql.session.timeZone` config -- + // typically an IANA TZ id (e.g. "America/Los_Angeles") or a + // fixed offset (e.g. "+08:00"). The engine MUST pass the value + // it would resolve from the session conf without further + // transformation, so the worker can interpret it the same way + // Spark does. optional string timezone = 7; // Reserved for future typed Init fields, in particular keys @@ -261,6 +267,11 @@ message InitResponse { // before the first [[DataRequest]]. The worker concatenates the // inline [[UdfPayload.payload]] (if any) followed by all chunks in // arrival order to form the final payload. +// +// Chunks are part of the Init handshake, not standalone control +// messages: they extend [[Init.udf.payload]] and are not +// individually acknowledged. The single [[InitResponse]] covers +// Init plus all of its chunks together. message PayloadChunk { // (Required, non-empty.) Bytes appended to the [[Init.udf]] // payload. @@ -357,9 +368,14 @@ message CancelResponse {} // unchanged, and the worker decodes them per the format the client // and worker have agreed on. message UdfPayload { - // (Required, may be empty when chunked.) Serialized callable - // bundle, opaque to the engine. The encoding is declared in - // [[format]]. + // (Required, may be empty when chunked.) Serialized UDF bundle, + // opaque to the engine. The encoding is declared in [[format]]. + // + // The bundle is not necessarily just the serialized callable; + // it is up to the client side of the protocol and the worker to + // agree on what is packed inside it -- e.g. custom encoders for + // user-defined types, type hints, or any other metadata the + // worker needs to invoke the UDF. // // For payloads too large to fit on a single gRPC message, this // field MAY be left empty (zero-length bytes) and the bytes @@ -392,27 +408,6 @@ message UdfPayload { // left unset. Otherwise the client side of the protocol sets it // explicitly. optional string eval_type = 5; - - // (Optional) Custom input encoder bytes. The worker already - // ships with built-in decoders for its standard types (e.g. a - // Python worker turns Arrow batches into pandas / pyarrow - // values out of the box; a JVM worker has its own defaults). - // Set this field only when the UDF needs a conversion the - // worker doesn't know about -- for example, recovering Arrow - // batches into client-provided Scala case classes, or any - // other user-defined type the function definer requires. - // - // Packed by the client side of the protocol; opaque to the - // wire protocol. Left unset whenever the worker's built-in - // decoders are sufficient. - optional bytes input_encoder = 6; - - // (Optional) Custom output encoder bytes. Mirror of - // [[input_encoder]]: set only when the UDF produces values the - // worker cannot convert to [[DataResponse.data]] using its - // built-in encoders, and the client side of the protocol needs - // to ship the conversion alongside the UDF. - optional bytes output_encoder = 7; } // ===================================================================== @@ -435,8 +430,8 @@ message WorkerRequest { message WorkerResponse { // Exactly one branch MUST be set, mirroring the request oneof. oneof manage { - HeartbeatAck heartbeat = 1; - ShutdownResponse shutdown = 2; + HeartbeatResponse heartbeat = 1; + ShutdownResponse shutdown = 2; } } @@ -445,7 +440,7 @@ message WorkerResponse { message Heartbeat {} // Acknowledgment for [[Heartbeat]]. -message HeartbeatAck {} +message HeartbeatResponse {} // Engine-initiated graceful shutdown request. Independent of SIGTERM // (which is the OS-level fallback) -- this lets the worker know the From 5c30dda52e738ad660c5169fd2b52931f4729eaa Mon Sep 17 00:00:00 2001 From: Haiyang Sun Date: Thu, 7 May 2026 04:31:57 +0000 Subject: [PATCH 4/5] address comments. --- udf/worker/proto/src/main/protobuf/udf_protocol.proto | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/udf/worker/proto/src/main/protobuf/udf_protocol.proto b/udf/worker/proto/src/main/protobuf/udf_protocol.proto index c5ed5987a7812..00980b14a077a 100644 --- a/udf/worker/proto/src/main/protobuf/udf_protocol.proto +++ b/udf/worker/proto/src/main/protobuf/udf_protocol.proto @@ -384,9 +384,10 @@ message UdfPayload { bytes payload = 1; // (Required, non-empty.) Format tag identifying the encoding of - // [[payload]] (e.g. "py-cloudpickle-v3", "wasm-v1"). Engine does - // not interpret this; the client side of the protocol and the - // worker agree on its meaning. + // [[payload]]. The protocol does not enumerate the values: the + // client side of the protocol and the worker agree on the + // namespace, and each worker recognises the tags it knows how + // to decode. The engine forwards this string unchanged. string format = 2; // (Optional) Total payload size in bytes. Useful when chunked From 7ca4b62fbc73c4c2b998c442c635d006e266e8cc Mon Sep 17 00:00:00 2001 From: Haiyang Sun Date: Thu, 7 May 2026 06:36:03 +0000 Subject: [PATCH 5/5] fix README --- udf/worker/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/udf/worker/README.md b/udf/worker/README.md index 707005721c693..8a0c12683a48a 100644 --- a/udf/worker/README.md +++ b/udf/worker/README.md @@ -118,7 +118,7 @@ try { session.init(Init.newBuilder() .setUdf(UdfPayload.newBuilder() .setPayload(ByteString.copyFrom(serializedFunction)) - .setFormat("py-cloudpickle-v3")) + .setFormat(payloadFormat)) // worker-recognised tag .setDataFormat(UDFWorkerDataFormat.ARROW) .setInputSchema(ByteString.copyFrom(arrowInputSchema)) .setOutputSchema(ByteString.copyFrom(arrowOutputSchema))