From 98617123763e63f6c82cd3a8ebbeaa4f369fecd5 Mon Sep 17 00:00:00 2001 From: RtlZeroMemory <58250858+RtlZeroMemory@users.noreply.github.com> Date: Tue, 17 Mar 2026 16:59:08 +0400 Subject: [PATCH] refactor(node): split node backend by concern --- packages/node/src/backend/nodeBackend.ts | 784 ++++-------------- .../src/backend/nodeBackend/debugChannel.ts | 60 ++ .../src/backend/nodeBackend/executionMode.ts | 66 ++ .../src/backend/nodeBackend/frameTracking.ts | 202 +++++ .../src/backend/nodeBackend/frameTransport.ts | 192 +++++ .../node/src/backend/nodeBackend/shared.ts | 108 +++ .../node/src/backend/nodeBackendInline.ts | 24 +- 7 files changed, 778 insertions(+), 658 deletions(-) create mode 100644 packages/node/src/backend/nodeBackend/debugChannel.ts create mode 100644 packages/node/src/backend/nodeBackend/executionMode.ts create mode 100644 packages/node/src/backend/nodeBackend/frameTracking.ts create mode 100644 packages/node/src/backend/nodeBackend/frameTransport.ts create mode 100644 packages/node/src/backend/nodeBackend/shared.ts diff --git a/packages/node/src/backend/nodeBackend.ts b/packages/node/src/backend/nodeBackend.ts index 72f48c67..96342ce9 100644 --- a/packages/node/src/backend/nodeBackend.ts +++ b/packages/node/src/backend/nodeBackend.ts @@ -7,8 +7,7 @@ * @see docs/backend/native.md */ -import { existsSync } from "node:fs"; -import { Worker, type WorkerOptions } from "node:worker_threads"; +import { Worker } from "node:worker_threads"; import type { BackendEventBatch, DebugBackend, @@ -20,7 +19,7 @@ import type { TerminalCaps, TerminalProfile, } from "@rezi-ui/core"; -import { DEFAULT_TERMINAL_CAPS, FRAME_ACCEPTED_ACK_MARKER } from "@rezi-ui/core"; +import { DEFAULT_TERMINAL_CAPS } from "@rezi-ui/core"; import { ZR_DRAWLIST_VERSION_V1, ZR_ENGINE_ABI_MAJOR, @@ -32,29 +31,17 @@ import { severityToNum, } from "@rezi-ui/core"; import type { BackendBeginFrame } from "@rezi-ui/core/backend"; -import { - createFrameAuditLogger, - drawlistFingerprint, - maybeDumpDrawlistBytes, -} from "../frameAudit.js"; +import { createFrameAuditLogger } from "../frameAudit.js"; import { type EngineCreateConfig, - FRAME_SAB_CONTROL_CONSUMED_SEQ_WORD, - FRAME_SAB_CONTROL_HEADER_WORDS, - FRAME_SAB_CONTROL_PUBLISHED_BYTES_WORD, FRAME_SAB_CONTROL_PUBLISHED_SEQ_WORD, - FRAME_SAB_CONTROL_PUBLISHED_SLOT_WORD, - FRAME_SAB_CONTROL_PUBLISHED_TOKEN_WORD, - FRAME_SAB_CONTROL_WORDS_PER_SLOT, FRAME_SAB_SLOT_STATE_FREE, FRAME_SAB_SLOT_STATE_READY, - FRAME_SAB_SLOT_STATE_WRITING, FRAME_TRANSPORT_SAB_V1, FRAME_TRANSPORT_TRANSFER_V1, FRAME_TRANSPORT_VERSION, type FrameTransportConfig, type MainToWorkerMessage, - type PerfSnapshotWire, type WorkerToMainMessage, } from "../worker/protocol.js"; import { @@ -71,376 +58,65 @@ import { import { DEBUG_QUERY_DEFAULT_RECORDS, DEBUG_QUERY_MAX_RECORDS } from "./backendSharedDebug.js"; import { attachBackendMarkers } from "./backendSharedMarkers.js"; import { applyEmojiWidthPolicy, resolveBackendEmojiWidthPolicy } from "./emojiWidthPolicy.js"; +import { + assertWorkerEnvironmentSupported, + hasInteractiveTty, + resolveWorkerEntry, + selectNodeBackendExecutionMode, +} from "./nodeBackend/executionMode.js"; +import { + createNodeBackendFrameTrackingState, + registerFrameAudit, + rejectFrameWaiters, + releaseFrameReservation, + reserveFramePromise, + resolveAcceptedFramesUpTo, + resolveCoalescedCompletionFramesUpTo, + settleCompletedFrame, +} from "./nodeBackend/frameTracking.js"; +import { + FRAME_SAB_SLOT_BYTES_DEFAULT, + FRAME_SAB_SLOT_COUNT_DEFAULT, + acquireSabSlot, + acquireSabSlotTracked, + copyInto, + createSabFrameTransport, + frameSeqToSlotToken, + publishSabFrame, + resetSabFrameTransport, +} from "./nodeBackend/frameTransport.js"; +export type { + NodeBackend, + NodeBackendConfig, + NodeBackendExecutionModeSelection, + NodeBackendExecutionModeSelectionInput, + NodeBackendInternalOpts, + NodeBackendPerf, + NodeBackendPerfSnapshot, +} from "./nodeBackend/shared.js"; +import { + createNodeBackendDebugChannelState, + enqueueDebug, + rejectDebugWaiters, +} from "./nodeBackend/debugChannel.js"; +import { deferred, safeErr } from "./nodeBackend/shared.js"; +import type { + Deferred, + NodeBackend, + NodeBackendInternalOpts, + NodeBackendPerf, +} from "./nodeBackend/shared.js"; import { createNodeBackendInlineInternal } from "./nodeBackendInline.js"; import { terminalProfileFromNodeEnv } from "./terminalProfile.js"; -export type NodeBackendConfig = Readonly<{ - /** - * Runtime execution mode: - * - "auto": pick inline only for very low fps caps (<=30), worker otherwise - * - "worker": worker-thread engine ownership - * - "inline": single-thread inline backend (no worker-hop transport) - */ - executionMode?: "auto" | "worker" | "inline"; - /** - * @deprecated Prefer createNodeApp({ config: { fpsCap } }) so app/core and backend - * remain aligned by construction. - */ - fpsCap?: number; - /** - * @deprecated Prefer createNodeApp({ config: { maxEventBytes } }) so app/core and backend - * remain aligned by construction. - */ - maxEventBytes?: number; - /** - * Frame transport mode: - * - "auto": prefer SAB mailbox transport when available, fallback to transfer. - * - "transfer": always use transferable ArrayBuffer path. - * - "sab": require SAB mailbox path when available, fallback to transfer when unavailable. - */ - frameTransport?: "auto" | "transfer" | "sab"; - /** SAB mailbox slot count (default: 8). */ - frameSabSlotCount?: number; - /** SAB mailbox bytes per slot (default: 1 MiB). */ - frameSabSlotBytes?: number; - /** - * Extra native `engine_create` configuration passed through to the addon (e.g. `limits`). - * Keys are forwarded as-is (camelCase or snake_case accepted by the native parser). - */ - nativeConfig?: Readonly>; - /** - * Emoji width policy used to keep core layout measurement and native rendering aligned. - * - "auto": use native/env overrides; optional probe when `ZRUI_EMOJI_WIDTH_PROBE=1` - * then fallback to deterministic "wide" - * - "wide": emoji clusters consume 2 cells - * - "narrow": emoji clusters consume 1 cell - * - * This sets core text measurement policy and native `widthPolicy` together. - */ - emojiWidthPolicy?: "auto" | "wide" | "narrow"; -}>; - -export type NodeBackendInternalOpts = Readonly<{ - config?: NodeBackendConfig; - nativeShimModule?: string; -}>; - -export type NodeBackendPerfSnapshot = Readonly<{ - phases: Readonly< - Record< - string, - { - count: number; - avg: number; - p50: number; - p95: number; - p99: number; - max: number; - worst10: readonly number[]; - } - > - >; -}>; - -export type NodeBackendPerf = Readonly<{ - perfSnapshot: () => Promise; -}>; - -export type NodeBackend = RuntimeBackend & Readonly<{ debug: DebugBackend; perf: NodeBackendPerf }>; - -export type NodeBackendExecutionModeSelectionInput = Readonly<{ - requestedExecutionMode: "auto" | "worker" | "inline"; - fpsCap: number; - nativeShimModule?: string; - hasAnyTty: boolean; -}>; - -export type NodeBackendExecutionModeSelection = Readonly<{ - resolvedExecutionMode: "worker" | "inline"; - selectedExecutionMode: "worker" | "inline"; - fallbackReason: string | null; -}>; - -type Deferred = Readonly<{ - promise: Promise; - resolve: (v: T) => void; - reject: (err: Error) => void; -}>; - -type WorkerEntryResolution = Readonly<{ - entry: URL; - options: WorkerOptions; -}>; - -type SabFrameTransport = Readonly<{ - control: SharedArrayBuffer; - data: SharedArrayBuffer; - slotCount: number; - slotBytes: number; - controlHeader: Int32Array; - states: Int32Array; - tokens: Int32Array; - dataBytes: Uint8Array; - nextSlot: { value: number }; -}>; - type BeginFrameMetrics = { success: number; fallbackToRequestFrame: number; readyReclaims: number; }; -type FrameAuditEntry = { - frameSeq: number; - submitAtMs: number; - submitPath: "requestFrame" | "beginFrame"; - transport: typeof FRAME_TRANSPORT_TRANSFER_V1 | typeof FRAME_TRANSPORT_SAB_V1; - byteLen: number; - hash32: string; - prefixHash32: string; - cmdCount: number | null; - totalSize: number | null; - head16: string; - tail16: string; - slotIndex?: number; - slotToken?: number; - acceptedLogged?: boolean; -}; - const WIDTH_POLICY_KEY = "widthPolicy" as const; - -function deferred(): Deferred { - let resolve!: (v: T) => void; - let reject!: (err: Error) => void; - const promise = new Promise((res, rej) => { - resolve = res; - reject = (err: unknown) => rej(err instanceof Error ? err : new Error(String(err))); - }); - return { promise, resolve, reject }; -} - -function safeErr(err: unknown): Error { - return err instanceof Error ? err : new Error(String(err)); -} - -function resolveWorkerEntry(workerData: WorkerOptions["workerData"]): WorkerEntryResolution { - const options: WorkerOptions = { workerData }; - const workerEntryJs = new URL("../worker/engineWorker.js", import.meta.url); - if (existsSync(workerEntryJs)) { - return { entry: workerEntryJs, options }; - } - - // Source-mode worktrees do not emit sibling .js worker files under src. - // Use a JS bootstrap that registers tsx and then imports engineWorker.ts. - const workerEntryBootstrapJs = new URL("../worker/engineWorker.bootstrap.js", import.meta.url); - if (existsSync(workerEntryBootstrapJs)) { - return { entry: workerEntryBootstrapJs, options }; - } - - throw new ZrUiError( - "ZRUI_BACKEND_ERROR", - "Unable to locate worker entry (expected engineWorker.js or engineWorker.bootstrap.js)", - ); -} - -function hasInteractiveTty(): boolean { - return ( - process.stdin.isTTY === true || process.stdout.isTTY === true || process.stderr.isTTY === true - ); -} - -export function selectNodeBackendExecutionMode( - input: NodeBackendExecutionModeSelectionInput, -): NodeBackendExecutionModeSelection { - const { requestedExecutionMode, fpsCap } = input; - const resolvedExecutionMode: "worker" | "inline" = - requestedExecutionMode === "inline" - ? "inline" - : requestedExecutionMode === "worker" - ? "worker" - : fpsCap <= 30 - ? "inline" - : "worker"; - return { - resolvedExecutionMode, - selectedExecutionMode: resolvedExecutionMode, - fallbackReason: null, - }; -} - -function assertWorkerEnvironmentSupported(nativeShimModule: string | undefined): void { - if (nativeShimModule !== undefined) return; - if (hasInteractiveTty()) return; - throw new ZrUiError( - "ZRUI_BACKEND_ERROR", - 'Worker backend requires a TTY when using @rezi-ui/native. Use `executionMode: "inline"` for headless runs or pass `nativeShimModule` in test harnesses.', - ); -} - -const FRAME_SAB_SLOT_COUNT_DEFAULT = 8 as const; -const FRAME_SAB_SLOT_BYTES_DEFAULT = 1 << 20; - -function copyInto(buf: ArrayBuffer, bytes: Uint8Array): void { - new Uint8Array(buf, 0, bytes.byteLength).set(bytes); -} - -function frameSeqToSlotToken(frameSeq: number): number { - const token = frameSeq & 0x7fff_ffff; - return token === 0 ? 1 : token; -} - -function createSabFrameTransport(slotCount: number, slotBytes: number): SabFrameTransport | null { - if (typeof SharedArrayBuffer !== "function") return null; - const control = new SharedArrayBuffer( - (FRAME_SAB_CONTROL_HEADER_WORDS + slotCount * FRAME_SAB_CONTROL_WORDS_PER_SLOT) * - Int32Array.BYTES_PER_ELEMENT, - ); - const controlHeader = new Int32Array(control, 0, FRAME_SAB_CONTROL_HEADER_WORDS); - const states = new Int32Array( - control, - FRAME_SAB_CONTROL_HEADER_WORDS * Int32Array.BYTES_PER_ELEMENT, - slotCount, - ); - const tokens = new Int32Array( - control, - (FRAME_SAB_CONTROL_HEADER_WORDS + slotCount) * Int32Array.BYTES_PER_ELEMENT, - slotCount, - ); - Atomics.store(controlHeader, FRAME_SAB_CONTROL_PUBLISHED_SEQ_WORD, 0); - Atomics.store(controlHeader, FRAME_SAB_CONTROL_PUBLISHED_SLOT_WORD, 0); - Atomics.store(controlHeader, FRAME_SAB_CONTROL_PUBLISHED_BYTES_WORD, 0); - Atomics.store(controlHeader, FRAME_SAB_CONTROL_PUBLISHED_TOKEN_WORD, 0); - Atomics.store(controlHeader, FRAME_SAB_CONTROL_CONSUMED_SEQ_WORD, 0); - for (let i = 0; i < slotCount; i++) { - Atomics.store(states, i, FRAME_SAB_SLOT_STATE_FREE); - Atomics.store(tokens, i, 0); - } - const data = new SharedArrayBuffer(slotCount * slotBytes); - return { - control, - data, - slotCount, - slotBytes, - controlHeader, - states, - tokens, - dataBytes: new Uint8Array(data), - nextSlot: { value: 0 }, - }; -} - -function resetSabFrameTransport(t: SabFrameTransport): void { - Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_SEQ_WORD, 0); - Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_SLOT_WORD, 0); - Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_BYTES_WORD, 0); - Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_TOKEN_WORD, 0); - Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_CONSUMED_SEQ_WORD, 0); - for (let i = 0; i < t.slotCount; i++) { - Atomics.store(t.states, i, FRAME_SAB_SLOT_STATE_FREE); - Atomics.store(t.tokens, i, 0); - } - t.nextSlot.value = 0; -} - -type SabSlotAcquireResult = Readonly<{ - slotIndex: number; - reclaimedReady: boolean; -}>; - -function acquireSabSlot(t: SabFrameTransport): number { - const start = t.nextSlot.value % t.slotCount; - for (let i = 0; i < t.slotCount; i++) { - const slot = (start + i) % t.slotCount; - const prev = Atomics.compareExchange( - t.states, - slot, - FRAME_SAB_SLOT_STATE_FREE, - FRAME_SAB_SLOT_STATE_WRITING, - ); - if (prev === FRAME_SAB_SLOT_STATE_FREE) { - t.nextSlot.value = (slot + 1) % t.slotCount; - return slot; - } - } - // Latest-wins semantics allow reclaiming stale READY slots instead of - // falling back to transfer under pressure. - for (let i = 0; i < t.slotCount; i++) { - const slot = (start + i) % t.slotCount; - const prev = Atomics.compareExchange( - t.states, - slot, - FRAME_SAB_SLOT_STATE_READY, - FRAME_SAB_SLOT_STATE_WRITING, - ); - if (prev === FRAME_SAB_SLOT_STATE_READY) { - t.nextSlot.value = (slot + 1) % t.slotCount; - return slot; - } - } - return -1; -} - -function acquireSabSlotTracked(t: SabFrameTransport): SabSlotAcquireResult { - const start = t.nextSlot.value % t.slotCount; - for (let i = 0; i < t.slotCount; i++) { - const slot = (start + i) % t.slotCount; - const prev = Atomics.compareExchange( - t.states, - slot, - FRAME_SAB_SLOT_STATE_FREE, - FRAME_SAB_SLOT_STATE_WRITING, - ); - if (prev === FRAME_SAB_SLOT_STATE_FREE) { - t.nextSlot.value = (slot + 1) % t.slotCount; - return { slotIndex: slot, reclaimedReady: false }; - } - } - for (let i = 0; i < t.slotCount; i++) { - const slot = (start + i) % t.slotCount; - const prev = Atomics.compareExchange( - t.states, - slot, - FRAME_SAB_SLOT_STATE_READY, - FRAME_SAB_SLOT_STATE_WRITING, - ); - if (prev === FRAME_SAB_SLOT_STATE_READY) { - t.nextSlot.value = (slot + 1) % t.slotCount; - return { slotIndex: slot, reclaimedReady: true }; - } - } - return { slotIndex: -1, reclaimedReady: false }; -} - -function acquireSabFreeSlot(t: SabFrameTransport): number { - const start = t.nextSlot.value % t.slotCount; - for (let i = 0; i < t.slotCount; i++) { - const slot = (start + i) % t.slotCount; - const prev = Atomics.compareExchange( - t.states, - slot, - FRAME_SAB_SLOT_STATE_FREE, - FRAME_SAB_SLOT_STATE_WRITING, - ); - if (prev === FRAME_SAB_SLOT_STATE_FREE) { - t.nextSlot.value = (slot + 1) % t.slotCount; - return slot; - } - } - return -1; -} - -function publishSabFrame( - t: SabFrameTransport, - frameSeq: number, - slotIndex: number, - slotToken: number, - byteLen: number, -): void { - Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_SLOT_WORD, slotIndex); - Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_BYTES_WORD, byteLen); - Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_TOKEN_WORD, slotToken); - Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_SEQ_WORD, frameSeq); -} +export { selectNodeBackendExecutionMode }; export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): NodeBackend { const frameAudit = createFrameAuditLogger("backend"); @@ -532,9 +208,7 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N let exitDef: Deferred | null = null; let nextFrameSeq = 1; - const frameAcceptedWaiters = new Map>(); - const frameCompletionWaiters = new Map>(); - const frameAuditBySeq = new Map(); + const frameTracking = createNodeBackendFrameTrackingState(); const eventQueue: Array< Readonly<{ batch: ArrayBuffer; byteLen: number; droppedSinceLast: number }> @@ -548,200 +222,14 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N // Debug request serialization (no request IDs in protocol) // ============================================================================= - let debugChain: Promise = Promise.resolve(); - let debugEnableDef: Deferred | null = null; - let debugDisableDef: Deferred | null = null; - let debugQueryDef: Deferred<{ headers: Uint8Array; result: DebugQueryResult }> | null = null; - let debugGetPayloadDef: Deferred | null = null; - let debugGetStatsDef: Deferred | null = null; - let debugExportDef: Deferred | null = null; - let debugResetDef: Deferred | null = null; - let perfSnapshotDef: Deferred | null = null; - - function rejectDebugWaiters(err: Error): void { - debugEnableDef?.reject(err); - debugEnableDef = null; - debugDisableDef?.reject(err); - debugDisableDef = null; - debugQueryDef?.reject(err); - debugQueryDef = null; - debugGetPayloadDef?.reject(err); - debugGetPayloadDef = null; - debugGetStatsDef?.reject(err); - debugGetStatsDef = null; - debugExportDef?.reject(err); - debugExportDef = null; - debugResetDef?.reject(err); - debugResetDef = null; - perfSnapshotDef?.reject(err); - perfSnapshotDef = null; - } - - function enqueueDebug(fn: () => Promise): Promise { - const p = debugChain.then(fn, fn); - debugChain = p.then( - () => undefined, - () => undefined, - ); - return p; - } - - function rejectFrameWaiters(err: Error): void { - for (const waiter of frameAcceptedWaiters.values()) { - waiter.reject(err); - } - frameAcceptedWaiters.clear(); - for (const waiter of frameCompletionWaiters.values()) { - waiter.reject(err); - } - frameCompletionWaiters.clear(); - if (frameAudit.enabled) { - for (const [seq, meta] of frameAuditBySeq.entries()) { - frameAudit.emit("frame.aborted", { - reason: err.message, - ageMs: Math.max(0, Date.now() - meta.submitAtMs), - ...meta, - }); - } - frameAuditBySeq.clear(); - } - } - - function registerFrameAudit( - frameSeq: number, - submitPath: "requestFrame" | "beginFrame", - transport: typeof FRAME_TRANSPORT_TRANSFER_V1 | typeof FRAME_TRANSPORT_SAB_V1, - bytes: Uint8Array, - slotIndex?: number, - slotToken?: number, - ): void { - if (!frameAudit.enabled) return; - const fp = drawlistFingerprint(bytes); - const meta: FrameAuditEntry = { - frameSeq, - submitAtMs: Date.now(), - submitPath, - transport, - byteLen: fp.byteLen, - hash32: fp.hash32, - prefixHash32: fp.prefixHash32, - cmdCount: fp.cmdCount, - totalSize: fp.totalSize, - head16: fp.head16, - tail16: fp.tail16, - ...(slotIndex === undefined ? {} : { slotIndex }), - ...(slotToken === undefined ? {} : { slotToken }), - }; - frameAuditBySeq.set(frameSeq, meta); - maybeDumpDrawlistBytes("backend", submitPath, frameSeq, bytes); - frameAudit.emit("frame.submitted", meta); - } - - function markAcceptedFramesUpTo(acceptedSeq: number): void { - if (!frameAudit.enabled) return; - for (const [seq, meta] of frameAuditBySeq.entries()) { - if (seq > acceptedSeq) continue; - if (meta.acceptedLogged === true) continue; - frameAudit.emit("frame.accepted", { - acceptedSeq, - ageMs: Math.max(0, Date.now() - meta.submitAtMs), - ...meta, - }); - meta.acceptedLogged = true; - } - } - - function markCoalescedFramesBefore(acceptedSeq: number): void { - if (!frameAudit.enabled) return; - for (const [seq, meta] of frameAuditBySeq.entries()) { - if (seq >= acceptedSeq) continue; - frameAudit.emit("frame.coalesced", { - acceptedSeq, - ageMs: Math.max(0, Date.now() - meta.submitAtMs), - ...meta, - }); - frameAuditBySeq.delete(seq); - } - } - - function markCompletedFrame(frameSeq: number, completedResult: number): void { - if (!frameAudit.enabled) return; - const meta = frameAuditBySeq.get(frameSeq); - frameAudit.emit("frame.completed", { - completedResult, - ageMs: meta ? Math.max(0, Date.now() - meta.submitAtMs) : null, - ...(meta ?? {}), - }); - frameAuditBySeq.delete(frameSeq); - } - - function resolveAcceptedFramesUpTo(acceptedSeq: number): void { - if (!Number.isInteger(acceptedSeq) || acceptedSeq <= 0) return; - markAcceptedFramesUpTo(acceptedSeq); - for (const [seq, waiter] of frameAcceptedWaiters.entries()) { - if (seq > acceptedSeq) continue; - frameAcceptedWaiters.delete(seq); - waiter.resolve(undefined); - } - } - - function resolveCoalescedCompletionFramesUpTo(acceptedSeq: number): void { - if (!Number.isInteger(acceptedSeq) || acceptedSeq <= 0) return; - markCoalescedFramesBefore(acceptedSeq); - for (const [seq, waiter] of frameCompletionWaiters.entries()) { - if (seq >= acceptedSeq) continue; - frameCompletionWaiters.delete(seq); - waiter.resolve(undefined); - } - } - - function settleCompletedFrame(frameSeq: number, completedResult: number): void { - markCompletedFrame(frameSeq, completedResult); - const waiter = frameCompletionWaiters.get(frameSeq); - if (waiter === undefined) return; - frameCompletionWaiters.delete(frameSeq); - if (completedResult < 0) { - waiter.reject( - new ZrUiError( - "ZRUI_BACKEND_ERROR", - `engine frame completion failed: seq=${String(frameSeq)} code=${String(completedResult)}`, - ), - ); - return; - } - waiter.resolve(undefined); - } - - function reserveFramePromise( - frameSeq: number, - ): Promise & Partial>> { - const frameAcceptedDef = deferred(); - frameAcceptedWaiters.set(frameSeq, frameAcceptedDef); - const frameCompletionDef = deferred(); - frameCompletionWaiters.set(frameSeq, frameCompletionDef); - const framePromise = frameCompletionDef.promise as Promise & - Partial>>; - Object.defineProperty(framePromise, FRAME_ACCEPTED_ACK_MARKER, { - value: frameAcceptedDef.promise, - configurable: false, - enumerable: false, - writable: false, - }); - return framePromise; - } - - function releaseFrameReservation(frameSeq: number): void { - frameAcceptedWaiters.delete(frameSeq); - frameCompletionWaiters.delete(frameSeq); - if (frameAudit.enabled) frameAuditBySeq.delete(frameSeq); - } + const debugChannel = createNodeBackendDebugChannelState(); function failAll(err: Error): void { while (eventWaiters.length > 0) eventWaiters.shift()?.reject(err); while (capsWaiters.length > 0) capsWaiters.shift()?.reject(err); eventQueue.length = 0; - rejectFrameWaiters(err); - rejectDebugWaiters(err); + rejectFrameWaiters(frameTracking, frameAudit, err); + rejectDebugWaiters(debugChannel, err); if (startDef !== null && !startSettled) { startSettled = true; @@ -757,8 +245,8 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N while (eventWaiters.length > 0) eventWaiters.shift()?.reject(err); while (capsWaiters.length > 0) capsWaiters.shift()?.reject(err); eventQueue.length = 0; - rejectFrameWaiters(err); - rejectDebugWaiters(err); + rejectFrameWaiters(frameTracking, frameAudit, err); + rejectDebugWaiters(debugChannel, err); if (startDef !== null && !startSettled) { startSettled = true; @@ -809,8 +297,8 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N failAll(fatal); return; } - resolveAcceptedFramesUpTo(msg.acceptedSeq); - resolveCoalescedCompletionFramesUpTo(msg.acceptedSeq); + resolveAcceptedFramesUpTo(frameTracking, frameAudit, msg.acceptedSeq); + resolveCoalescedCompletionFramesUpTo(frameTracking, frameAudit, msg.acceptedSeq); if (msg.completedSeq !== undefined) { if (!Number.isInteger(msg.completedSeq) || msg.completedSeq <= 0) { @@ -830,7 +318,7 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N failAll(fatal); return; } - settleCompletedFrame(msg.completedSeq, completedResult); + settleCompletedFrame(frameTracking, frameAudit, msg.completedSeq, completedResult); if (completedResult < 0) { fatal = new ZrUiError( "ZRUI_BACKEND_ERROR", @@ -912,14 +400,14 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N } case "debug:enableResult": { - debugEnableDef?.resolve(msg.result); - debugEnableDef = null; + debugChannel.debugEnableDef?.resolve(msg.result); + debugChannel.debugEnableDef = null; return; } case "debug:disableResult": { - debugDisableDef?.resolve(msg.result); - debugDisableDef = null; + debugChannel.debugDisableDef?.resolve(msg.result); + debugChannel.debugDisableDef = null; return; } @@ -944,8 +432,8 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N newestRecordId: BigInt(msg.result.newestRecordId), recordsDropped: msg.result.recordsDropped, }; - debugQueryDef?.resolve({ headers, result }); - debugQueryDef = null; + debugChannel.debugQueryDef?.resolve({ headers, result }); + debugChannel.debugQueryDef = null; return; } catch (err) { fatal = new ZrUiError("ZRUI_BACKEND_ERROR", safeErr(err).message); @@ -956,8 +444,8 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N case "debug:getPayloadResult": { if (msg.result <= 0 || msg.payloadByteLen <= 0) { - debugGetPayloadDef?.resolve(null); - debugGetPayloadDef = null; + debugChannel.debugGetPayloadDef?.resolve(null); + debugChannel.debugGetPayloadDef = null; return; } try { @@ -971,8 +459,10 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N `debug:getPayloadResult: payloadByteLen=${String(msg.payloadByteLen)} exceeds buffer=${String(msg.payload.byteLength)}`, ); } - debugGetPayloadDef?.resolve(new Uint8Array(msg.payload, 0, msg.payloadByteLen)); - debugGetPayloadDef = null; + debugChannel.debugGetPayloadDef?.resolve( + new Uint8Array(msg.payload, 0, msg.payloadByteLen), + ); + debugChannel.debugGetPayloadDef = null; return; } catch (err) { fatal = new ZrUiError("ZRUI_BACKEND_ERROR", safeErr(err).message); @@ -991,8 +481,8 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N currentRingUsage: msg.stats.currentRingUsage, ringCapacity: msg.stats.ringCapacity, }; - debugGetStatsDef?.resolve(stats); - debugGetStatsDef = null; + debugChannel.debugGetStatsDef?.resolve(stats); + debugChannel.debugGetStatsDef = null; return; } catch (err) { fatal = new ZrUiError("ZRUI_BACKEND_ERROR", safeErr(err).message); @@ -1013,8 +503,8 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N `debug:exportResult: bufferByteLen=${String(msg.bufferByteLen)} exceeds buffer=${String(msg.buffer.byteLength)}`, ); } - debugExportDef?.resolve(new Uint8Array(msg.buffer, 0, msg.bufferByteLen)); - debugExportDef = null; + debugChannel.debugExportDef?.resolve(new Uint8Array(msg.buffer, 0, msg.bufferByteLen)); + debugChannel.debugExportDef = null; return; } catch (err) { fatal = new ZrUiError("ZRUI_BACKEND_ERROR", safeErr(err).message); @@ -1024,14 +514,14 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N } case "debug:resetResult": { - debugResetDef?.resolve(msg.result); - debugResetDef = null; + debugChannel.debugResetDef?.resolve(msg.result); + debugChannel.debugResetDef = null; return; } case "perf:snapshotResult": { - perfSnapshotDef?.resolve(msg.snapshot); - perfSnapshotDef = null; + debugChannel.perfSnapshotDef?.resolve(msg.snapshot); + debugChannel.perfSnapshotDef = null; return; } @@ -1207,8 +697,8 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N while (eventWaiters.length > 0) eventWaiters.shift()?.reject(err); while (capsWaiters.length > 0) capsWaiters.shift()?.reject(err); eventQueue.length = 0; - rejectFrameWaiters(err); - rejectDebugWaiters(err); + rejectFrameWaiters(frameTracking, frameAudit, err); + rejectDebugWaiters(debugChannel, err); if (startDef !== null && !startSettled) { startSettled = true; @@ -1230,13 +720,15 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N if (worker === null) return Promise.reject(new Error("NodeBackend: worker not available")); const frameSeq = nextFrameSeq++; - const framePromise = reserveFramePromise(frameSeq); + const framePromise = reserveFramePromise(frameTracking, frameSeq); if (sabFrameTransport !== null && drawlist.byteLength <= sabFrameTransport.slotBytes) { const slotIndex = acquireSabSlot(sabFrameTransport); if (slotIndex >= 0) { const slotToken = frameSeqToSlotToken(frameSeq); registerFrameAudit( + frameTracking, + frameAudit, frameSeq, "requestFrame", FRAME_TRANSPORT_SAB_V1, @@ -1276,7 +768,14 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N // - completion promise settles on worker completion/coalescing status const buf = new ArrayBuffer(drawlist.byteLength); copyInto(buf, drawlist); - registerFrameAudit(frameSeq, "requestFrame", FRAME_TRANSPORT_TRANSFER_V1, drawlist); + registerFrameAudit( + frameTracking, + frameAudit, + frameSeq, + "requestFrame", + FRAME_TRANSPORT_TRANSFER_V1, + drawlist, + ); try { send( { @@ -1289,7 +788,7 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N [buf], ); } catch (err) { - releaseFrameReservation(frameSeq); + releaseFrameReservation(frameTracking, frameAudit, frameSeq); if (frameAudit.enabled) { frameAudit.emit("frame.transfer.publish_error", { frameSeq, @@ -1374,14 +873,16 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N const debug: DebugBackend = { debugEnable: (config: DebugConfig) => - enqueueDebug(async () => { + enqueueDebug(debugChannel, async () => { if (disposed) throw new Error("NodeBackend: disposed"); if (fatal !== null) throw fatal; await backend.start(); if (worker === null) throw new Error("NodeBackend: worker not available"); - if (debugEnableDef !== null) throw new Error("NodeBackend: debugEnable already in-flight"); - debugEnableDef = deferred(); + if (debugChannel.debugEnableDef !== null) { + throw new Error("NodeBackend: debugEnable already in-flight"); + } + debugChannel.debugEnableDef = deferred(); const minSeverity = config.minSeverity !== undefined ? severityToNum(config.minSeverity) : null; @@ -1401,26 +902,26 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N send({ type: "debug:enable", config: configWire }); - const rc = await debugEnableDef.promise; - debugEnableDef = null; + const rc = await debugChannel.debugEnableDef.promise; + debugChannel.debugEnableDef = null; if (rc < 0) { throw new ZrUiError("ZRUI_BACKEND_ERROR", `engineDebugEnable failed: code=${String(rc)}`); } }), debugDisable: () => - enqueueDebug(async () => { + enqueueDebug(debugChannel, async () => { if (disposed) throw new Error("NodeBackend: disposed"); if (fatal !== null) throw fatal; await backend.start(); if (worker === null) throw new Error("NodeBackend: worker not available"); - if (debugDisableDef !== null) + if (debugChannel.debugDisableDef !== null) throw new Error("NodeBackend: debugDisable already in-flight"); - debugDisableDef = deferred(); + debugChannel.debugDisableDef = deferred(); send({ type: "debug:disable" }); - const rc = await debugDisableDef.promise; - debugDisableDef = null; + const rc = await debugChannel.debugDisableDef.promise; + debugChannel.debugDisableDef = null; if (rc < 0) { throw new ZrUiError( "ZRUI_BACKEND_ERROR", @@ -1430,14 +931,16 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N }), debugQuery: (query: DebugQuery) => - enqueueDebug(async () => { + enqueueDebug(debugChannel, async () => { if (disposed) throw new Error("NodeBackend: disposed"); if (fatal !== null) throw fatal; await backend.start(); if (worker === null) throw new Error("NodeBackend: worker not available"); - if (debugQueryDef !== null) throw new Error("NodeBackend: debugQuery already in-flight"); - debugQueryDef = deferred<{ headers: Uint8Array; result: DebugQueryResult }>(); + if (debugChannel.debugQueryDef !== null) { + throw new Error("NodeBackend: debugQuery already in-flight"); + } + debugChannel.debugQueryDef = deferred<{ headers: Uint8Array; result: DebugQueryResult }>(); const minSeverity = query.minSeverity !== undefined ? severityToNum(query.minSeverity) : null; @@ -1460,22 +963,22 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N send({ type: "debug:query", query: queryWire, headersCap }); - const out = await debugQueryDef.promise; - debugQueryDef = null; + const out = await debugChannel.debugQueryDef.promise; + debugChannel.debugQueryDef = null; return out; }), debugGetPayload: (recordId: bigint) => - enqueueDebug(async () => { + enqueueDebug(debugChannel, async () => { if (disposed) throw new Error("NodeBackend: disposed"); if (fatal !== null) throw fatal; await backend.start(); if (worker === null) throw new Error("NodeBackend: worker not available"); - if (debugGetPayloadDef !== null) { + if (debugChannel.debugGetPayloadDef !== null) { throw new Error("NodeBackend: debugGetPayload already in-flight"); } - debugGetPayloadDef = deferred(); + debugChannel.debugGetPayloadDef = deferred(); // Payloads can include raw drawlist bytes. Default to 4 MiB. const payloadCap = 1 << 22; @@ -1485,54 +988,58 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N payloadCap, }); - const bytes = await debugGetPayloadDef.promise; - debugGetPayloadDef = null; + const bytes = await debugChannel.debugGetPayloadDef.promise; + debugChannel.debugGetPayloadDef = null; return bytes; }), debugGetStats: () => - enqueueDebug(async () => { + enqueueDebug(debugChannel, async () => { if (disposed) throw new Error("NodeBackend: disposed"); if (fatal !== null) throw fatal; await backend.start(); if (worker === null) throw new Error("NodeBackend: worker not available"); - if (debugGetStatsDef !== null) + if (debugChannel.debugGetStatsDef !== null) throw new Error("NodeBackend: debugGetStats already in-flight"); - debugGetStatsDef = deferred(); + debugChannel.debugGetStatsDef = deferred(); send({ type: "debug:getStats" }); - const stats = await debugGetStatsDef.promise; - debugGetStatsDef = null; + const stats = await debugChannel.debugGetStatsDef.promise; + debugChannel.debugGetStatsDef = null; return stats; }), debugExport: () => - enqueueDebug(async () => { + enqueueDebug(debugChannel, async () => { if (disposed) throw new Error("NodeBackend: disposed"); if (fatal !== null) throw fatal; await backend.start(); if (worker === null) throw new Error("NodeBackend: worker not available"); - if (debugExportDef !== null) throw new Error("NodeBackend: debugExport already in-flight"); - debugExportDef = deferred(); + if (debugChannel.debugExportDef !== null) { + throw new Error("NodeBackend: debugExport already in-flight"); + } + debugChannel.debugExportDef = deferred(); send({ type: "debug:export", bufferCap: 1 << 23 }); // 8 MiB - const bytes = await debugExportDef.promise; - debugExportDef = null; + const bytes = await debugChannel.debugExportDef.promise; + debugChannel.debugExportDef = null; return bytes; }), debugReset: () => - enqueueDebug(async () => { + enqueueDebug(debugChannel, async () => { if (disposed) throw new Error("NodeBackend: disposed"); if (fatal !== null) throw fatal; await backend.start(); if (worker === null) throw new Error("NodeBackend: worker not available"); - if (debugResetDef !== null) throw new Error("NodeBackend: debugReset already in-flight"); - debugResetDef = deferred(); + if (debugChannel.debugResetDef !== null) { + throw new Error("NodeBackend: debugReset already in-flight"); + } + debugChannel.debugResetDef = deferred(); send({ type: "debug:reset" }); - const rc = await debugResetDef.promise; - debugResetDef = null; + const rc = await debugChannel.debugResetDef.promise; + debugChannel.debugResetDef = null; if (rc < 0) { throw new ZrUiError("ZRUI_BACKEND_ERROR", `engineDebugReset failed: code=${String(rc)}`); } @@ -1541,18 +1048,19 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N const perf: NodeBackendPerf = { perfSnapshot: () => - enqueueDebug(async () => { + enqueueDebug(debugChannel, async () => { if (disposed) throw new Error("NodeBackend: disposed"); if (fatal !== null) throw fatal; await backend.start(); if (worker === null) throw new Error("NodeBackend: worker not available"); - if (perfSnapshotDef !== null) + if (debugChannel.perfSnapshotDef !== null) throw new Error("NodeBackend: perfSnapshot already in-flight"); - perfSnapshotDef = deferred(); + debugChannel.perfSnapshotDef = + deferred>>(); send({ type: "perf:snapshot" }); - const snapshot = await perfSnapshotDef.promise; - perfSnapshotDef = null; + const snapshot = await debugChannel.perfSnapshotDef.promise; + debugChannel.perfSnapshotDef = null; return snapshot; }), }; @@ -1637,9 +1145,11 @@ export function createNodeBackendInternal(opts: NodeBackendInternalOpts = {}): N } const frameSeq = nextFrameSeq++; - const framePromise = reserveFramePromise(frameSeq); + const framePromise = reserveFramePromise(frameTracking, frameSeq); const slotToken = frameSeqToSlotToken(frameSeq); registerFrameAudit( + frameTracking, + frameAudit, frameSeq, "beginFrame", FRAME_TRANSPORT_SAB_V1, diff --git a/packages/node/src/backend/nodeBackend/debugChannel.ts b/packages/node/src/backend/nodeBackend/debugChannel.ts new file mode 100644 index 00000000..f846a513 --- /dev/null +++ b/packages/node/src/backend/nodeBackend/debugChannel.ts @@ -0,0 +1,60 @@ +import type { DebugQueryResult, DebugStats } from "@rezi-ui/core"; +import type { PerfSnapshotWire } from "../../worker/protocol.js"; +import type { Deferred } from "./shared.js"; + +export type NodeBackendDebugChannelState = { + debugChain: Promise; + debugEnableDef: Deferred | null; + debugDisableDef: Deferred | null; + debugQueryDef: Deferred<{ headers: Uint8Array; result: DebugQueryResult }> | null; + debugGetPayloadDef: Deferred | null; + debugGetStatsDef: Deferred | null; + debugExportDef: Deferred | null; + debugResetDef: Deferred | null; + perfSnapshotDef: Deferred | null; +}; + +export function createNodeBackendDebugChannelState(): NodeBackendDebugChannelState { + return { + debugChain: Promise.resolve(), + debugEnableDef: null, + debugDisableDef: null, + debugQueryDef: null, + debugGetPayloadDef: null, + debugGetStatsDef: null, + debugExportDef: null, + debugResetDef: null, + perfSnapshotDef: null, + }; +} + +export function rejectDebugWaiters(state: NodeBackendDebugChannelState, err: Error): void { + state.debugEnableDef?.reject(err); + state.debugEnableDef = null; + state.debugDisableDef?.reject(err); + state.debugDisableDef = null; + state.debugQueryDef?.reject(err); + state.debugQueryDef = null; + state.debugGetPayloadDef?.reject(err); + state.debugGetPayloadDef = null; + state.debugGetStatsDef?.reject(err); + state.debugGetStatsDef = null; + state.debugExportDef?.reject(err); + state.debugExportDef = null; + state.debugResetDef?.reject(err); + state.debugResetDef = null; + state.perfSnapshotDef?.reject(err); + state.perfSnapshotDef = null; +} + +export function enqueueDebug( + state: NodeBackendDebugChannelState, + fn: () => Promise, +): Promise { + const p = state.debugChain.then(fn, fn); + state.debugChain = p.then( + () => undefined, + () => undefined, + ); + return p; +} diff --git a/packages/node/src/backend/nodeBackend/executionMode.ts b/packages/node/src/backend/nodeBackend/executionMode.ts new file mode 100644 index 00000000..48bb6c7f --- /dev/null +++ b/packages/node/src/backend/nodeBackend/executionMode.ts @@ -0,0 +1,66 @@ +import { existsSync } from "node:fs"; +import type { WorkerOptions } from "node:worker_threads"; +import { ZrUiError } from "@rezi-ui/core"; +import type { + NodeBackendExecutionModeSelection, + NodeBackendExecutionModeSelectionInput, +} from "./shared.js"; + +export type WorkerEntryResolution = Readonly<{ + entry: URL; + options: WorkerOptions; +}>; + +export function resolveWorkerEntry(workerData: WorkerOptions["workerData"]): WorkerEntryResolution { + const options: WorkerOptions = { workerData }; + const workerEntryJs = new URL("../../worker/engineWorker.js", import.meta.url); + if (existsSync(workerEntryJs)) { + return { entry: workerEntryJs, options }; + } + + // Source-mode worktrees do not emit sibling .js worker files under src. + // Use a JS bootstrap that registers tsx and then imports engineWorker.ts. + const workerEntryBootstrapJs = new URL("../../worker/engineWorker.bootstrap.js", import.meta.url); + if (existsSync(workerEntryBootstrapJs)) { + return { entry: workerEntryBootstrapJs, options }; + } + + throw new ZrUiError( + "ZRUI_BACKEND_ERROR", + "Unable to locate worker entry (expected engineWorker.js or engineWorker.bootstrap.js)", + ); +} + +export function hasInteractiveTty(): boolean { + return ( + process.stdin.isTTY === true || process.stdout.isTTY === true || process.stderr.isTTY === true + ); +} + +export function selectNodeBackendExecutionMode( + input: NodeBackendExecutionModeSelectionInput, +): NodeBackendExecutionModeSelection { + const { requestedExecutionMode, fpsCap } = input; + const resolvedExecutionMode: "worker" | "inline" = + requestedExecutionMode === "inline" + ? "inline" + : requestedExecutionMode === "worker" + ? "worker" + : fpsCap <= 30 + ? "inline" + : "worker"; + return { + resolvedExecutionMode, + selectedExecutionMode: resolvedExecutionMode, + fallbackReason: null, + }; +} + +export function assertWorkerEnvironmentSupported(nativeShimModule: string | undefined): void { + if (nativeShimModule !== undefined) return; + if (hasInteractiveTty()) return; + throw new ZrUiError( + "ZRUI_BACKEND_ERROR", + 'Worker backend requires a TTY when using @rezi-ui/native. Use `executionMode: "inline"` for headless runs or pass `nativeShimModule` in test harnesses.', + ); +} diff --git a/packages/node/src/backend/nodeBackend/frameTracking.ts b/packages/node/src/backend/nodeBackend/frameTracking.ts new file mode 100644 index 00000000..cbd9fbbd --- /dev/null +++ b/packages/node/src/backend/nodeBackend/frameTracking.ts @@ -0,0 +1,202 @@ +import { FRAME_ACCEPTED_ACK_MARKER, ZrUiError } from "@rezi-ui/core"; +import type { FrameAuditLogger } from "../../frameAudit.js"; +import { drawlistFingerprint, maybeDumpDrawlistBytes } from "../../frameAudit.js"; +import type { Deferred } from "./shared.js"; +import { deferred } from "./shared.js"; + +export type FrameAuditEntry = { + frameSeq: number; + submitAtMs: number; + submitPath: "requestFrame" | "beginFrame"; + transport: string; + byteLen: number; + hash32: string; + prefixHash32: string; + cmdCount: number | null; + totalSize: number | null; + head16: string; + tail16: string; + slotIndex?: number; + slotToken?: number; + acceptedLogged?: boolean; +}; + +export type NodeBackendFrameTrackingState = Readonly<{ + frameAcceptedWaiters: Map>; + frameCompletionWaiters: Map>; + frameAuditBySeq: Map; +}>; + +export function createNodeBackendFrameTrackingState(): NodeBackendFrameTrackingState { + return { + frameAcceptedWaiters: new Map>(), + frameCompletionWaiters: new Map>(), + frameAuditBySeq: new Map(), + }; +} + +export function rejectFrameWaiters( + state: NodeBackendFrameTrackingState, + frameAudit: FrameAuditLogger, + err: Error, +): void { + for (const waiter of state.frameAcceptedWaiters.values()) { + waiter.reject(err); + } + state.frameAcceptedWaiters.clear(); + for (const waiter of state.frameCompletionWaiters.values()) { + waiter.reject(err); + } + state.frameCompletionWaiters.clear(); + if (frameAudit.enabled) { + for (const [seq, meta] of state.frameAuditBySeq.entries()) { + frameAudit.emit("frame.aborted", { + reason: err.message, + ageMs: Math.max(0, Date.now() - meta.submitAtMs), + ...meta, + }); + } + state.frameAuditBySeq.clear(); + } +} + +export function registerFrameAudit( + state: NodeBackendFrameTrackingState, + frameAudit: FrameAuditLogger, + frameSeq: number, + submitPath: "requestFrame" | "beginFrame", + transport: string, + bytes: Uint8Array, + slotIndex?: number, + slotToken?: number, +): void { + if (!frameAudit.enabled) return; + const fp = drawlistFingerprint(bytes); + const meta: FrameAuditEntry = { + frameSeq, + submitAtMs: Date.now(), + submitPath, + transport, + byteLen: fp.byteLen, + hash32: fp.hash32, + prefixHash32: fp.prefixHash32, + cmdCount: fp.cmdCount, + totalSize: fp.totalSize, + head16: fp.head16, + tail16: fp.tail16, + ...(slotIndex === undefined ? {} : { slotIndex }), + ...(slotToken === undefined ? {} : { slotToken }), + }; + state.frameAuditBySeq.set(frameSeq, meta); + maybeDumpDrawlistBytes("backend", submitPath, frameSeq, bytes); + frameAudit.emit("frame.submitted", meta); +} + +export function resolveAcceptedFramesUpTo( + state: NodeBackendFrameTrackingState, + frameAudit: FrameAuditLogger, + acceptedSeq: number, +): void { + if (!Number.isInteger(acceptedSeq) || acceptedSeq <= 0) return; + if (frameAudit.enabled) { + for (const [seq, meta] of state.frameAuditBySeq.entries()) { + if (seq > acceptedSeq) continue; + if (meta.acceptedLogged === true) continue; + frameAudit.emit("frame.accepted", { + acceptedSeq, + ageMs: Math.max(0, Date.now() - meta.submitAtMs), + ...meta, + }); + meta.acceptedLogged = true; + } + } + for (const [seq, waiter] of state.frameAcceptedWaiters.entries()) { + if (seq > acceptedSeq) continue; + state.frameAcceptedWaiters.delete(seq); + waiter.resolve(undefined); + } +} + +export function resolveCoalescedCompletionFramesUpTo( + state: NodeBackendFrameTrackingState, + frameAudit: FrameAuditLogger, + acceptedSeq: number, +): void { + if (!Number.isInteger(acceptedSeq) || acceptedSeq <= 0) return; + if (frameAudit.enabled) { + for (const [seq, meta] of state.frameAuditBySeq.entries()) { + if (seq >= acceptedSeq) continue; + frameAudit.emit("frame.coalesced", { + acceptedSeq, + ageMs: Math.max(0, Date.now() - meta.submitAtMs), + ...meta, + }); + state.frameAuditBySeq.delete(seq); + } + } + for (const [seq, waiter] of state.frameCompletionWaiters.entries()) { + if (seq >= acceptedSeq) continue; + state.frameCompletionWaiters.delete(seq); + waiter.resolve(undefined); + } +} + +export function settleCompletedFrame( + state: NodeBackendFrameTrackingState, + frameAudit: FrameAuditLogger, + frameSeq: number, + completedResult: number, +): void { + if (frameAudit.enabled) { + const meta = state.frameAuditBySeq.get(frameSeq); + frameAudit.emit("frame.completed", { + completedResult, + ageMs: meta ? Math.max(0, Date.now() - meta.submitAtMs) : null, + ...(meta ?? {}), + }); + state.frameAuditBySeq.delete(frameSeq); + } + + const waiter = state.frameCompletionWaiters.get(frameSeq); + if (waiter === undefined) return; + state.frameCompletionWaiters.delete(frameSeq); + if (completedResult < 0) { + waiter.reject( + new ZrUiError( + "ZRUI_BACKEND_ERROR", + `engine frame completion failed: seq=${String(frameSeq)} code=${String(completedResult)}`, + ), + ); + return; + } + waiter.resolve(undefined); +} + +export function reserveFramePromise( + state: NodeBackendFrameTrackingState, + frameSeq: number, +): Promise & Partial>> { + const frameAcceptedDef = deferred(); + state.frameAcceptedWaiters.set(frameSeq, frameAcceptedDef); + const frameCompletionDef = deferred(); + state.frameCompletionWaiters.set(frameSeq, frameCompletionDef); + const framePromise = frameCompletionDef.promise as Promise & + Partial>>; + Object.defineProperty(framePromise, FRAME_ACCEPTED_ACK_MARKER, { + value: frameAcceptedDef.promise, + configurable: false, + enumerable: false, + writable: false, + }); + return framePromise; +} + +export function releaseFrameReservation( + state: NodeBackendFrameTrackingState, + frameAudit: FrameAuditLogger, + frameSeq: number, +): void { + state.frameAcceptedWaiters.delete(frameSeq); + state.frameCompletionWaiters.delete(frameSeq); + if (frameAudit.enabled) state.frameAuditBySeq.delete(frameSeq); +} diff --git a/packages/node/src/backend/nodeBackend/frameTransport.ts b/packages/node/src/backend/nodeBackend/frameTransport.ts new file mode 100644 index 00000000..6d738e67 --- /dev/null +++ b/packages/node/src/backend/nodeBackend/frameTransport.ts @@ -0,0 +1,192 @@ +import { + FRAME_SAB_CONTROL_CONSUMED_SEQ_WORD, + FRAME_SAB_CONTROL_HEADER_WORDS, + FRAME_SAB_CONTROL_PUBLISHED_BYTES_WORD, + FRAME_SAB_CONTROL_PUBLISHED_SEQ_WORD, + FRAME_SAB_CONTROL_PUBLISHED_SLOT_WORD, + FRAME_SAB_CONTROL_PUBLISHED_TOKEN_WORD, + FRAME_SAB_CONTROL_WORDS_PER_SLOT, + FRAME_SAB_SLOT_STATE_FREE, + FRAME_SAB_SLOT_STATE_READY, + FRAME_SAB_SLOT_STATE_WRITING, +} from "../../worker/protocol.js"; + +export const FRAME_SAB_SLOT_COUNT_DEFAULT = 8 as const; +export const FRAME_SAB_SLOT_BYTES_DEFAULT = 1 << 20; + +export type SabFrameTransport = Readonly<{ + control: SharedArrayBuffer; + data: SharedArrayBuffer; + slotCount: number; + slotBytes: number; + controlHeader: Int32Array; + states: Int32Array; + tokens: Int32Array; + dataBytes: Uint8Array; + nextSlot: { value: number }; +}>; + +export type SabSlotAcquireResult = Readonly<{ + slotIndex: number; + reclaimedReady: boolean; +}>; + +export function copyInto(buf: ArrayBuffer, bytes: Uint8Array): void { + new Uint8Array(buf, 0, bytes.byteLength).set(bytes); +} + +export function frameSeqToSlotToken(frameSeq: number): number { + const token = frameSeq & 0x7fff_ffff; + return token === 0 ? 1 : token; +} + +export function createSabFrameTransport( + slotCount: number, + slotBytes: number, +): SabFrameTransport | null { + if (typeof SharedArrayBuffer !== "function") return null; + const control = new SharedArrayBuffer( + (FRAME_SAB_CONTROL_HEADER_WORDS + slotCount * FRAME_SAB_CONTROL_WORDS_PER_SLOT) * + Int32Array.BYTES_PER_ELEMENT, + ); + const controlHeader = new Int32Array(control, 0, FRAME_SAB_CONTROL_HEADER_WORDS); + const states = new Int32Array( + control, + FRAME_SAB_CONTROL_HEADER_WORDS * Int32Array.BYTES_PER_ELEMENT, + slotCount, + ); + const tokens = new Int32Array( + control, + (FRAME_SAB_CONTROL_HEADER_WORDS + slotCount) * Int32Array.BYTES_PER_ELEMENT, + slotCount, + ); + Atomics.store(controlHeader, FRAME_SAB_CONTROL_PUBLISHED_SEQ_WORD, 0); + Atomics.store(controlHeader, FRAME_SAB_CONTROL_PUBLISHED_SLOT_WORD, 0); + Atomics.store(controlHeader, FRAME_SAB_CONTROL_PUBLISHED_BYTES_WORD, 0); + Atomics.store(controlHeader, FRAME_SAB_CONTROL_PUBLISHED_TOKEN_WORD, 0); + Atomics.store(controlHeader, FRAME_SAB_CONTROL_CONSUMED_SEQ_WORD, 0); + for (let i = 0; i < slotCount; i++) { + Atomics.store(states, i, FRAME_SAB_SLOT_STATE_FREE); + Atomics.store(tokens, i, 0); + } + const data = new SharedArrayBuffer(slotCount * slotBytes); + return { + control, + data, + slotCount, + slotBytes, + controlHeader, + states, + tokens, + dataBytes: new Uint8Array(data), + nextSlot: { value: 0 }, + }; +} + +export function resetSabFrameTransport(t: SabFrameTransport): void { + Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_SEQ_WORD, 0); + Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_SLOT_WORD, 0); + Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_BYTES_WORD, 0); + Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_TOKEN_WORD, 0); + Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_CONSUMED_SEQ_WORD, 0); + for (let i = 0; i < t.slotCount; i++) { + Atomics.store(t.states, i, FRAME_SAB_SLOT_STATE_FREE); + Atomics.store(t.tokens, i, 0); + } + t.nextSlot.value = 0; +} + +export function acquireSabSlot(t: SabFrameTransport): number { + const start = t.nextSlot.value % t.slotCount; + for (let i = 0; i < t.slotCount; i++) { + const slot = (start + i) % t.slotCount; + const prev = Atomics.compareExchange( + t.states, + slot, + FRAME_SAB_SLOT_STATE_FREE, + FRAME_SAB_SLOT_STATE_WRITING, + ); + if (prev === FRAME_SAB_SLOT_STATE_FREE) { + t.nextSlot.value = (slot + 1) % t.slotCount; + return slot; + } + } + // Latest-wins semantics allow reclaiming stale READY slots instead of + // falling back to transfer under pressure. + for (let i = 0; i < t.slotCount; i++) { + const slot = (start + i) % t.slotCount; + const prev = Atomics.compareExchange( + t.states, + slot, + FRAME_SAB_SLOT_STATE_READY, + FRAME_SAB_SLOT_STATE_WRITING, + ); + if (prev === FRAME_SAB_SLOT_STATE_READY) { + t.nextSlot.value = (slot + 1) % t.slotCount; + return slot; + } + } + return -1; +} + +export function acquireSabSlotTracked(t: SabFrameTransport): SabSlotAcquireResult { + const start = t.nextSlot.value % t.slotCount; + for (let i = 0; i < t.slotCount; i++) { + const slot = (start + i) % t.slotCount; + const prev = Atomics.compareExchange( + t.states, + slot, + FRAME_SAB_SLOT_STATE_FREE, + FRAME_SAB_SLOT_STATE_WRITING, + ); + if (prev === FRAME_SAB_SLOT_STATE_FREE) { + t.nextSlot.value = (slot + 1) % t.slotCount; + return { slotIndex: slot, reclaimedReady: false }; + } + } + for (let i = 0; i < t.slotCount; i++) { + const slot = (start + i) % t.slotCount; + const prev = Atomics.compareExchange( + t.states, + slot, + FRAME_SAB_SLOT_STATE_READY, + FRAME_SAB_SLOT_STATE_WRITING, + ); + if (prev === FRAME_SAB_SLOT_STATE_READY) { + t.nextSlot.value = (slot + 1) % t.slotCount; + return { slotIndex: slot, reclaimedReady: true }; + } + } + return { slotIndex: -1, reclaimedReady: false }; +} + +export function acquireSabFreeSlot(t: SabFrameTransport): number { + const start = t.nextSlot.value % t.slotCount; + for (let i = 0; i < t.slotCount; i++) { + const slot = (start + i) % t.slotCount; + const prev = Atomics.compareExchange( + t.states, + slot, + FRAME_SAB_SLOT_STATE_FREE, + FRAME_SAB_SLOT_STATE_WRITING, + ); + if (prev === FRAME_SAB_SLOT_STATE_FREE) { + t.nextSlot.value = (slot + 1) % t.slotCount; + return slot; + } + } + return -1; +} + +export function publishSabFrame( + t: SabFrameTransport, + frameSeq: number, + slotIndex: number, + slotToken: number, + byteLen: number, +): void { + Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_SLOT_WORD, slotIndex); + Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_BYTES_WORD, byteLen); + Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_TOKEN_WORD, slotToken); + Atomics.store(t.controlHeader, FRAME_SAB_CONTROL_PUBLISHED_SEQ_WORD, frameSeq); +} diff --git a/packages/node/src/backend/nodeBackend/shared.ts b/packages/node/src/backend/nodeBackend/shared.ts new file mode 100644 index 00000000..0149ea9a --- /dev/null +++ b/packages/node/src/backend/nodeBackend/shared.ts @@ -0,0 +1,108 @@ +import type { DebugBackend, RuntimeBackend } from "@rezi-ui/core"; + +export type NodeBackendConfig = Readonly<{ + /** + * Runtime execution mode: + * - "auto": pick inline only for very low fps caps (<=30), worker otherwise + * - "worker": worker-thread engine ownership + * - "inline": single-thread inline backend (no worker-hop transport) + */ + executionMode?: "auto" | "worker" | "inline"; + /** + * @deprecated Prefer createNodeApp({ config: { fpsCap } }) so app/core and backend + * remain aligned by construction. + */ + fpsCap?: number; + /** + * @deprecated Prefer createNodeApp({ config: { maxEventBytes } }) so app/core and backend + * remain aligned by construction. + */ + maxEventBytes?: number; + /** + * Frame transport mode: + * - "auto": prefer SAB mailbox transport when available, fallback to transfer. + * - "transfer": always use transferable ArrayBuffer path. + * - "sab": require SAB mailbox path when available, fallback to transfer when unavailable. + */ + frameTransport?: "auto" | "transfer" | "sab"; + /** SAB mailbox slot count (default: 8). */ + frameSabSlotCount?: number; + /** SAB mailbox bytes per slot (default: 1 MiB). */ + frameSabSlotBytes?: number; + /** + * Extra native `engine_create` configuration passed through to the addon (e.g. `limits`). + * Keys are forwarded as-is (camelCase or snake_case accepted by the native parser). + */ + nativeConfig?: Readonly>; + /** + * Emoji width policy used to keep core layout measurement and native rendering aligned. + * - "auto": use native/env overrides; optional probe when `ZRUI_EMOJI_WIDTH_PROBE=1` + * then fallback to deterministic "wide" + * - "wide": emoji clusters consume 2 cells + * - "narrow": emoji clusters consume 1 cell + * + * This sets core text measurement policy and native `widthPolicy` together. + */ + emojiWidthPolicy?: "auto" | "wide" | "narrow"; +}>; + +export type NodeBackendInternalOpts = Readonly<{ + config?: NodeBackendConfig; + nativeShimModule?: string; +}>; + +export type NodeBackendPerfSnapshot = Readonly<{ + phases: Readonly< + Record< + string, + { + count: number; + avg: number; + p50: number; + p95: number; + p99: number; + max: number; + worst10: readonly number[]; + } + > + >; +}>; + +export type NodeBackendPerf = Readonly<{ + perfSnapshot: () => Promise; +}>; + +export type NodeBackend = RuntimeBackend & Readonly<{ debug: DebugBackend; perf: NodeBackendPerf }>; + +export type NodeBackendExecutionModeSelectionInput = Readonly<{ + requestedExecutionMode: "auto" | "worker" | "inline"; + fpsCap: number; + nativeShimModule?: string; + hasAnyTty: boolean; +}>; + +export type NodeBackendExecutionModeSelection = Readonly<{ + resolvedExecutionMode: "worker" | "inline"; + selectedExecutionMode: "worker" | "inline"; + fallbackReason: string | null; +}>; + +export type Deferred = Readonly<{ + promise: Promise; + resolve: (value: T) => void; + reject: (err: Error) => void; +}>; + +export function deferred(): Deferred { + let resolve!: (value: T) => void; + let reject!: (err: Error) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = (err: unknown) => rej(err instanceof Error ? err : new Error(String(err))); + }); + return { promise, resolve, reject }; +} + +export function safeErr(err: unknown): Error { + return err instanceof Error ? err : new Error(String(err)); +} diff --git a/packages/node/src/backend/nodeBackendInline.ts b/packages/node/src/backend/nodeBackendInline.ts index 138d0fc6..80b9181d 100644 --- a/packages/node/src/backend/nodeBackendInline.ts +++ b/packages/node/src/backend/nodeBackendInline.ts @@ -52,18 +52,14 @@ import { import { attachBackendMarkers } from "./backendSharedMarkers.js"; import { applyEmojiWidthPolicy, resolveBackendEmojiWidthPolicy } from "./emojiWidthPolicy.js"; import type { + Deferred, NodeBackend, NodeBackendInternalOpts, NodeBackendPerfSnapshot, -} from "./nodeBackend.js"; +} from "./nodeBackend/shared.js"; +import { deferred, safeErr } from "./nodeBackend/shared.js"; import { terminalProfileFromNodeEnv } from "./terminalProfile.js"; -type Deferred = Readonly<{ - promise: Promise; - resolve: (value: T) => void; - reject: (err: Error) => void; -}>; - type NativeCaps = Readonly<{ colorMode: number; supportsMouse: boolean; @@ -155,20 +151,6 @@ const PERF_MAX_SAMPLES = 1024; type PerfSample = Readonly<{ phase: string; durationMs: number }>; -function deferred(): Deferred { - let resolve!: (value: T) => void; - let reject!: (err: Error) => void; - const promise = new Promise((res, rej) => { - resolve = res; - reject = (err: unknown) => rej(err instanceof Error ? err : new Error(String(err))); - }); - return { promise, resolve, reject }; -} - -function safeErr(err: unknown): Error { - return err instanceof Error ? err : new Error(String(err)); -} - function safeDetail(err: unknown): string { if (err instanceof Error) return `${err.name}: ${err.message}`; return String(err);