Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 84 additions & 7 deletions components/Application.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { spawn } from 'node:child_process';
import { createReadStream, existsSync, readdirSync } from 'node:fs';
import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { StringDecoder } from 'node:string_decoder';

import { extract } from 'tar-fs';
import gunzip from 'gunzip-maybe';
Expand Down Expand Up @@ -306,12 +307,16 @@ export async function installApplication(application: Application) {
// If custom install command is specified, run it
if (application.install?.command) {
const [command, ...args] = application.install.command.split(' ');
const customOnLine = application.onInstallLine
? (stream: 'stdout' | 'stderr', line: string) => application.onInstallLine!(command, stream, line)
: undefined;
const { stdout, stderr, code } = await nonInteractiveSpawn(
application.name,
command,
args,
application.dirPath,
application.install?.timeout
application.install?.timeout,
customOnLine
);
// if it succeeds, return
if (code === 0) {
Expand Down Expand Up @@ -362,12 +367,16 @@ export async function installApplication(application: Application) {
// Would result in `pnpm@7` being used as the executable.
// Important note: an `npm` version should not be specifiable; the only valid npm version is the one installed alongside Node.js

const pmOnLine = application.onInstallLine
? (stream: 'stdout' | 'stderr', line: string) => application.onInstallLine!(packageManager.name, stream, line)
: undefined;
const { stdout, stderr, code } = await nonInteractiveSpawn(
application.name,
(application.packageManagerPrefix ? application.packageManagerPrefix + ' ' : '') + packageManager.name,
application.install?.allowInstallScripts ? ['install'] : ['install', '--ignore-scripts'], // All of `npm`, `yarn`, and `pnpm` support the `install` command. If we need to configure options here we may have to use some other defaults though
application.dirPath,
application.install?.timeout
application.install?.timeout,
pmOnLine
);

// if it succeeds, return
Expand Down Expand Up @@ -414,12 +423,16 @@ export async function installApplication(application: Application) {
const npmInstallArgs = application.install?.allowInstallScripts
? ['install', '--force']
: ['install', '--force', '--ignore-scripts'];
const npmOnLine = application.onInstallLine
? (stream: 'stdout' | 'stderr', line: string) => application.onInstallLine!('npm', stream, line)
: undefined;
const { stdout, stderr, code } = await nonInteractiveSpawn(
application.name,
(application.packageManagerPrefix ? application.packageManagerPrefix + ' ' : '') + 'npm',
npmInstallArgs,
application.dirPath,
application.install?.timeout
application.install?.timeout,
npmOnLine
);

// if it succeeds, return
Expand All @@ -440,27 +453,39 @@ export async function installApplication(application: Application) {
throw new Error(`Failed to install dependencies for ${application.name} using npm default. Exit code: ${code}`);
}

/**
* Slice B2: callback invoked once per complete line of install stdout/stderr from
* `nonInteractiveSpawn`. Threaded through `installApplication` to the underlying spawn
* so a deploy can stream `npm install` output back to the caller as an SSE `install`
* event in real time, rather than waiting for the process to exit. Line-buffered so a
* chunk that splits mid-line never fires a partial line.
*/
export type OnInstallLine = (manager: string, stream: 'stdout' | 'stderr', line: string) => void;

interface ApplicationOptions {
name: string;
payload?: Buffer | string | Readable;
packageIdentifier?: string;
install?: { command?: string; timeout?: number; allowInstallScripts?: boolean };
onInstallLine?: OnInstallLine;
}

export class Application {
name: string;
payload?: Buffer | string | Readable;
packageIdentifier?: string;
install?: { command?: string; timeout?: number; allowInstallScripts?: boolean };
onInstallLine?: OnInstallLine;
dirPath: string;
logger: Logger;
packageManagerPrefix: string; // can be used to configure a package manager prefix, specifically "sfw".

constructor({ name, payload, packageIdentifier, install }: ApplicationOptions) {
constructor({ name, payload, packageIdentifier, install, onInstallLine }: ApplicationOptions) {
this.name = name;
this.payload = payload;
this.packageIdentifier = packageIdentifier && derivePackageIdentifier(packageIdentifier);
this.install = install;
this.onInstallLine = onInstallLine;
const componentsRoot = getConfigPath(CONFIG_PARAMS.COMPONENTSROOT);
if (!componentsRoot) throw new Error('componentsRoot is not configured');
this.dirPath = join(componentsRoot, name);
Expand Down Expand Up @@ -612,12 +637,53 @@ function getGitSSHCommand() {
* @param timeoutMs The timeout for the command in milliseconds. Defaults to 5 minutes.
* @returns A promise that resolves when the command completes.
*/
/**
* Slice B2: line-buffered split that emits complete `\n`-terminated lines as they
* arrive, holding any partial trailing fragment until the next chunk or `flush()`.
* Required because `child_process` stdout/stderr `'data'` events fire per OS-level
* chunk, with no guarantee a chunk ends on a newline — without buffering, a long
* `npm install` line could be reported to the caller as two halves.
*
* Uses StringDecoder so a multi-byte UTF-8 character (e.g. the ✔ emoji npm prints
* for resolved packages) split across two chunks is reassembled into a single code
* point rather than each half being decoded as replacement characters.
*/
function createLineSplitter(onLine: (line: string) => void): {
push: (chunk: Buffer | string) => void;
flush: () => void;
} {
const decoder = new StringDecoder('utf8');
let pending = '';
return {
push(chunk) {
pending += typeof chunk === 'string' ? chunk : decoder.write(chunk);
let nl: number;
while ((nl = pending.indexOf('\n')) !== -1) {
const line = pending.slice(0, nl).replace(/\r$/, '');
pending = pending.slice(nl + 1);
onLine(line);
}
},
flush() {
// Drain any bytes the decoder is still holding (e.g. a multi-byte char that
// straddled the final chunk boundary).
const remaining = decoder.end();
if (remaining) pending += remaining;
if (pending.length > 0) {
onLine(pending);
pending = '';
}
},
};
}

export function nonInteractiveSpawn(
applicationName: string,
command: string,
args: string[],
cwd: string,
timeoutMs: number = 60 * 60 * 1000
timeoutMs: number = 60 * 60 * 1000,
onLine?: (stream: 'stdout' | 'stderr', line: string) => void
): Promise<{ stdout: string; stderr: string; code: number }> {
return new Promise((resolve, reject) => {
logger
Expand Down Expand Up @@ -647,24 +713,31 @@ export function nonInteractiveSpawn(
reject(new Error(`Command\`${command} ${args.join(' ')}\` timed out after ${timeoutMs}ms`));
}, timeoutMs);

// Slice B2: if a caller passed onLine, line-buffer stdout/stderr alongside the
// existing string accumulation so we never report a half-line.
const stdoutSplitter = onLine ? createLineSplitter((line) => onLine('stdout', line)) : null;
const stderrSplitter = onLine ? createLineSplitter((line) => onLine('stderr', line)) : null;

let stdout = '';
childProcess.stdout.on('data', (chunk) => {
// buffer stdout for later resolve
stdout += chunk.toString();
// log stdout lines immediately
// TODO: Technically nothing guarantees that a chunk will be a complete line so need to implement
// something here to buffer until a newline character, then log the complete line
logger.loggerWithTag(`${applicationName}:spawn:${command}:stdout`).debug?.(chunk.toString());
stdoutSplitter?.push(chunk);
});

// buffer stderr
let stderr = '';
childProcess.stderr.on('data', (chunk) => {
stderr += chunk.toString();
stderrSplitter?.push(chunk);
});

childProcess.on('error', (error) => {
clearTimeout(timeout);
stdoutSplitter?.flush();
stderrSplitter?.flush();
// Print out stderr before rejecting
if (stderr) {
printStd(applicationName, command, stderr, 'stderr');
Expand All @@ -674,6 +747,10 @@ export function nonInteractiveSpawn(

childProcess.on('close', (code) => {
clearTimeout(timeout);
// Flush any trailing partial lines so the caller sees process output that didn't
// end on a newline (some package managers do this on their final progress line).
stdoutSplitter?.flush();
stderrSplitter?.flush();
if (stderr) {
printStd(applicationName, command, stderr, 'stderr');
}
Expand Down
100 changes: 100 additions & 0 deletions components/deploymentRecorder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ export class DeploymentRecorder {
private unsubscribe: (() => void) | null = null;
private pendingPut: Promise<void> | null = null;
private dirty = false;
// Slice B2: peer outcomes are stashed here by recordPeers and applied inside finish()
// so the terminal put always carries them, avoiding a race with concurrent
// emitter-triggered puts that might otherwise overwrite peer_results with their
// pre-mutation snapshot of the record.
private pendingPeerResults: unknown[] | null = null;

private constructor(deploymentId: string, initial: Record<string, any>) {
this.deploymentId = deploymentId;
Expand Down Expand Up @@ -225,6 +230,28 @@ export class DeploymentRecorder {
await this.put();
}

/**
* Slice B2: write per-peer results back to the origin row after `replicateOperation`
* returns. The replication layer returns an opaque array of per-peer outcomes; we
* normalize them here to `{node, status, error?, started_at, completed_at}` and write
* once. Tolerates unknown shapes — anything we can't interpret becomes a plain
* stringified entry so the audit trail at least records that a peer was contacted.
*/
// eslint-disable-next-line @typescript-eslint/require-await
async recordPeers(results: unknown): Promise<void> {
if (this.finished) return;
if (!Array.isArray(results)) return;
// Stash for the terminal finish() put rather than writing immediately. A separate
// put here races with the coalesced emitter-triggered puts (each captures the
// in-memory record as it's serialized) and can lose peer_results when an earlier
// put's later-completing write overwrites our row. finish() bundles peer_results
// with the status=success/failed transition into one put, eliminating the race.
this.pendingPeerResults = results;
// Also update the in-memory record so any get_deployment SSE replay or other read
// before finish() sees the latest peer outcomes.
this.record.peer_results = results.map(normalizePeerResult);
}

async finish(status: 'success' | 'failed' | 'rolled_back', error?: unknown): Promise<void> {
if (this.finished) return;
// Send a terminal sentinel through the emitter (if any) BEFORE we unsubscribe and
Expand All @@ -246,6 +273,12 @@ export class DeploymentRecorder {
}
this.record.status = status;
this.record.completed_at = Date.now();
// Slice B2: re-apply any stashed peer outcomes right before the terminal put so they
// are bundled with the status transition and can't be lost to a put race.
if (this.pendingPeerResults) {
this.record.peer_results = this.pendingPeerResults.map(normalizePeerResult);
this.pendingPeerResults = null;
}
if (error) {
const e = error as { message?: string; code?: string | number; stack?: string };
this.record.error = {
Expand All @@ -272,6 +305,73 @@ export class DeploymentRecorder {
}
}

/**
* Slice B2: peer-side helper — wait for the hdb_deployment row to arrive via table
* replication, then return it. The row is committed on origin before `replicateOperation`
* is called, so peers normally find it immediately; this polling loop is for the rare
* case where the operation arrives faster than the table-replication channel.
*
* The payload_blob's chunks may still be in flight after the row arrives — that's fine,
* the Blob's `stream()` / `bytes()` API blocks on incomplete writes (resources/blob.ts).
*/
export async function awaitDeploymentRow(
deploymentId: string,
options: { timeoutMs?: number; pollIntervalMs?: number; initialPollIntervalMs?: number } = {}
): Promise<Record<string, any>> {
const timeoutMs = options.timeoutMs ?? 30_000;
const maxIntervalMs = options.pollIntervalMs ?? 100;
// Start fast (5ms) so the common case — replication has already caught up — sees no
// human-noticeable latency, then back off exponentially up to maxIntervalMs for the
// rare case where the row is genuinely still replicating.
let intervalMs = options.initialPollIntervalMs ?? 5;
const table = (databases as any).system?.[terms.SYSTEM_TABLE_NAMES.DEPLOYMENT_TABLE_NAME];
if (!table) {
throw new Error(
`Deployment tracking is not initialized on this node (system.${terms.SYSTEM_TABLE_NAMES.DEPLOYMENT_TABLE_NAME} missing).`
);
}
const deadline = Date.now() + timeoutMs;
let lastError: unknown;
while (Date.now() < deadline) {
try {
const row = await table.get(deploymentId);
if (row && row.payload_blob != null) return row;
} catch (err) {
lastError = err;
}
await new Promise<void>((resolve) => setTimeout(resolve, intervalMs));
intervalMs = Math.min(intervalMs * 2, maxIntervalMs);
}
throw new Error(
`Timed out after ${timeoutMs}ms waiting for hdb_deployment row '${deploymentId}' to replicate` +
(lastError ? ` (last error: ${(lastError as Error).message ?? lastError})` : '')
);
}

function normalizePeerResult(raw: unknown): Record<string, unknown> {
if (!raw || typeof raw !== 'object') {
// Replication layer returned a primitive — preserve as a stringified marker so the
// audit row at least records that something came back from a peer.
return { node: null, status: 'unknown', raw: String(raw) };
}
const r = raw as Record<string, unknown>;
const err = r.error;
const hasError =
err != null && (typeof err === 'string' ? err.length > 0 : typeof err === 'object' || typeof err === 'number');
return {
node: r.node ?? r.name ?? r.hostname ?? null,
status: hasError ? 'failed' : (r.status ?? 'success'),
error: hasError
? {
message: typeof err === 'object' ? ((err as any).message ?? String(err)) : String(err),
code: typeof err === 'object' ? (err as any).code : undefined,
}
: null,
started_at: r.started_at ?? null,
completed_at: r.completed_at ?? null,
};
}

function startStatusFor(phase: string | undefined): DeploymentStatus | null {
switch (phase) {
case 'extract':
Expand Down
32 changes: 25 additions & 7 deletions components/operations.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const { packageDirectory } = require('../components/packageComponent.ts');
const { Resources } = require('../resources/Resources.ts');
const { Application, prepareApplication } = require('./Application.ts');
const { server } = require('../server/Server.ts');
const { DeploymentRecorder } = require('./deploymentRecorder.ts');
const { DeploymentRecorder, awaitDeploymentRow } = require('./deploymentRecorder.ts');
const { ProgressEmitter } = require('../server/serverHelpers/progressEmitter.ts');

/**
Expand Down Expand Up @@ -417,12 +417,19 @@ async function deployComponent(req) {
try {
// On the origin, tee the tarball (Buffer or Readable from the multipart parser)
// through a hash-and-size tap into the row's payload_blob, then re-source extraction
// from the persisted blob. This is the staging area and (in Slice B) the channel
// peers will replicate from. On peer nodes we skip recording entirely and use the
// raw payload as-is.
// from the persisted blob. The blob is the channel peers read from in Slice B2.
if (recorder && req.payload != null) {
await recorder.ingestPayload(req.payload);
extractionPayload = recorder.row.payload_blob.stream();
} else if (isReplicatedExecution && req.payload == null && !req.package) {
// Slice B2 of #641: peer-side blob read. Origin stripped req.payload before
// replicateOperation; the tarball travels via the replicated hdb_deployment row's
// payload_blob attribute instead. Wait for the row to arrive on this node, then
// stream the blob — Blob.stream() handles in-flight BLOB_CHUNK writes by blocking
// until the chunks land. If the row never replicates within the timeout, peer
// records a failure and origin will see it in peer_results.
const row = await awaitDeploymentRow(req._deploymentId);
extractionPayload = row.payload_blob.stream();
}

const application = new Application({
Expand All @@ -434,6 +441,11 @@ async function deployComponent(req) {
timeout: req.install_timeout,
allowInstallScripts: req.install_allow_scripts,
},
// Slice B2: forward each complete line of install stdout/stderr to the SSE channel
// (and into the recorder's event_log via the same subscriber). Peers have no
// emitter — their install output goes to the local logger only; cross-node install
// streaming is intentionally out of scope for B2.
onInstallLine: emitter ? (manager, stream, line) => emit('install', { manager, stream, line }) : undefined,
});

emit('phase', { phase: 'prepare', status: 'start' });
Expand Down Expand Up @@ -466,13 +478,19 @@ async function deployComponent(req) {
const rollingRestart = req.restart === 'rolling';
// if doing a rolling restart set restart to false so that other nodes don't also restart.
req.restart = rollingRestart ? false : req.restart;
// ProgressEmitter holds function listeners that can't survive the replication
// channel's serialization, and the recorder is local to origin anyway. Strip both
// before sending so peers see a clean req.
// Strip transport-only fields that don't survive the replication channel and aren't
// meaningful to peers. The payload travels via the replicated hdb_deployment row's
// payload_blob attribute (Slice B2), so peers don't need req.payload at all — they
// look the row up by deployment_id. req._deploymentId is intentionally KEPT; it is
// the handoff that lets peers find the replicated row.
delete req.progress;
delete req.payload;
emit('phase', { phase: 'replicate', status: 'start' });
let response = await server.replication.replicateOperation(req);
emit('phase', { phase: 'replicate', status: 'done' });
if (recorder && response?.replicated) {
await recorder.recordPeers(response.replicated);
}
if (req.restart === true) {
emit('phase', { phase: 'restart', status: 'start' });
manageThreads.restartWorkers('http');
Expand Down
Loading
Loading