diff --git a/packages/php-wasm/universal/src/lib/api.ts b/packages/php-wasm/universal/src/lib/api.ts index 345553e7a2..4112c2a544 100644 --- a/packages/php-wasm/universal/src/lib/api.ts +++ b/packages/php-wasm/universal/src/lib/api.ts @@ -1,5 +1,5 @@ import type { PHPResponseData } from './php-response'; -import { PHPResponse } from './php-response'; +import { PHPResponse, StreamedPHPResponse } from './php-response'; import * as Comlink from './comlink-sync'; import { NodeSABSyncReceiveMessageTransport, @@ -240,6 +240,304 @@ function setupTransferHandlers() { } return serialized; }; + + Comlink.transferHandlers.set('StreamedPHPResponse', { + canHandle: (obj: unknown): obj is StreamedPHPResponse => + obj instanceof StreamedPHPResponse, + serialize(obj: StreamedPHPResponse): [any, Transferable[]] { + const supportsStreams = supportsTransferableStreams(); + const exitCodePort = promiseToPort(obj.exitCode); + if (supportsStreams) { + const payload = { + __type: 'StreamedPHPResponse', + headers: (obj as any)['headersStream'], + stdout: obj.stdout, + stderr: obj.stderr, + exitCodePort, + }; + return [payload, [exitCodePort]]; + } + // Fallback: bridge streams via MessagePorts + const headersPort = streamToPort((obj as any)['headersStream']); + const stdoutPort = streamToPort(obj.stdout); + const stderrPort = streamToPort(obj.stderr); + const payload = { + __type: 'StreamedPHPResponse', + headersPort, + stdoutPort, + stderrPort, + exitCodePort, + }; + return [ + payload, + [headersPort, stdoutPort, stderrPort, exitCodePort], + ]; + }, + deserialize(data: any): StreamedPHPResponse { + if (data.headers && data.stdout && data.stderr) { + const exitCode = portToPromise( + data.exitCodePort as MessagePort + ); + return new StreamedPHPResponse( + data.headers as ReadableStream, + data.stdout as ReadableStream, + data.stderr as ReadableStream, + exitCode + ); + } + const headers = portToStream(data.headersPort as MessagePort); + const stdout = portToStream(data.stdoutPort as MessagePort); + const stderr = portToStream(data.stderrPort as MessagePort); + const exitCode = portToPromise(data.exitCodePort as MessagePort); + return new StreamedPHPResponse(headers, stdout, stderr, exitCode); + }, + }); +} + +// Utilities for transferring ReadableStreams and Promises via MessagePorts: + +/** + * Safari does not support transferable streams, so we need to fallback to + * MessagePorts. + * Feature-detects whether this runtime supports transferring ReadableStreams + * directly through postMessage (aka "transferable streams"). When false, + * we must fall back to port-bridged streaming. + */ +function supportsTransferableStreams(): boolean { + try { + if (typeof ReadableStream === 'undefined') return false; + const { port1 } = new MessageChannel(); + const rs = new ReadableStream(); + port1.postMessage(rs as any); + try { + port1.close(); + } catch (_e) { + void _e; + } + return true; + } catch (_e) { + void _e; + return false; + } +} + +/** + * Bridges a ReadableStream to a MessagePort by reading chunks and posting + * messages to the port. Used as a fallback when transferable streams are not + * supported (e.g., Safari). + * + * Protocol of the returned MessagePort: + * + * { t: 'chunk', b: ArrayBuffer } – next binary chunk + * { t: 'close' } – end of stream + * { t: 'error', m: string } – terminal error + */ +function streamToPort(stream: ReadableStream): MessagePort { + const { port1, port2 } = new MessageChannel(); + (async () => { + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + try { + port1.postMessage({ t: 'close' }); + } catch { + // Ignore error + } + try { + port1.close(); + } catch { + // Ignore error + } + break; + } + if (value) { + // Ensure we transfer an owned buffer + const owned = + value.byteOffset === 0 && + value.byteLength === value.buffer.byteLength + ? value + : value.slice(); + const buf = owned.buffer; + try { + port1.postMessage({ t: 'chunk', b: buf }, [ + buf as unknown as Transferable, + ]); + } catch { + port1.postMessage({ + t: 'chunk', + b: owned.buffer.slice(0), + }); + } + } + } + } catch (e: any) { + try { + port1.postMessage({ t: 'error', m: e?.message || String(e) }); + } catch { + // Ignore error + } + } finally { + try { + port1.close(); + } catch { + // Ignore error + } + } + })(); + return port2; +} + +/** + * Reconstructs a ReadableStream from a MessagePort using the inverse of the + * streamToPort protocol. Each message enqueues data, closes, or errors. + */ +function portToStream(port: MessagePort): ReadableStream { + return new ReadableStream({ + start(controller) { + const onMessage = (ev: MessageEvent) => { + const data: any = (ev as any).data; + if (!data) return; + switch (data.t) { + case 'chunk': + controller.enqueue(new Uint8Array(data.b)); + break; + case 'close': + controller.close(); + cleanup(); + break; + case 'error': + controller.error(new Error(data.m || 'Stream error')); + cleanup(); + break; + } + }; + const cleanup = () => { + try { + port.removeEventListener?.('message', onMessage as any); + } catch { + // Ignore error + } + try { + port.onmessage = null; + } catch { + // Ignore error + } + try { + port.close(); + } catch { + // Ignore error + } + }; + if (port.addEventListener) { + port.addEventListener('message', onMessage as any); + } else if ((port as any).on) { + (port as any).on('message', (data: any) => + onMessage({ data } as any) + ); + } else { + port.onmessage = onMessage as any; + } + if (typeof port.start === 'function') { + port.start(); + } + }, + cancel() { + try { + port.close(); + } catch { + // Ignore error + } + }, + }); +} + +/** + * Bridges a Promise to a MessagePort so it can be delivered across threads. + * + * Protocol of the returned MessagePort: + * + * { t: 'resolve', v: any } – promise resolved with value v + * { t: 'reject', m: str } – promise rejected with message m + */ +function promiseToPort(promise: Promise): MessagePort { + const { port1, port2 } = new MessageChannel(); + promise + .then((value) => { + try { + port1.postMessage({ t: 'resolve', v: value }); + } catch { + // Ignore error + } + }) + .catch((err) => { + try { + port1.postMessage({ + t: 'reject', + m: (err as any)?.message || String(err), + }); + } catch { + // Ignore error + } + }) + .finally(() => { + try { + port1.close(); + } catch { + // Ignore error + } + }); + return port2; +} + +/** + * Reconstructs a Promise from a MessagePort using the inverse of + * promiseToPort. Resolves or rejects when the corresponding message arrives. + */ +function portToPromise(port: MessagePort): Promise { + return new Promise((resolve, reject) => { + const onMessage = (ev: MessageEvent) => { + const data: any = (ev as any).data; + if (!data) return; + if (data.t === 'resolve') { + cleanup(); + resolve(data.v); + } else if (data.t === 'reject') { + cleanup(); + reject(new Error(data.m || '')); + } + }; + const cleanup = () => { + try { + port.removeEventListener?.('message', onMessage as any); + } catch { + // Ignore error + } + try { + port.onmessage = null; + } catch { + // Ignore error + } + try { + port.close(); + } catch { + // Ignore error + } + }; + if (port.addEventListener) { + port.addEventListener('message', onMessage as any); + } else if ((port as any).on) { + (port as any).on('message', (data: any) => + onMessage({ data } as any) + ); + } else { + port.onmessage = onMessage as any; + } + if (typeof port.start === 'function') { + port.start(); + } + }); } // Augment Comlink's throw handler to include all the information carried by diff --git a/packages/playground/website/playwright/e2e/client.spec.ts b/packages/playground/website/playwright/e2e/client.spec.ts new file mode 100644 index 0000000000..384a83582b --- /dev/null +++ b/packages/playground/website/playwright/e2e/client.spec.ts @@ -0,0 +1,18 @@ +import { test, expect } from '../playground-fixtures.ts'; + +test('playground.cli() streams stdout', async ({ website }) => { + await website.goto('./'); + // Ensure the Playground client is connected and exposed on window + await website.page.waitForFunction(() => + Boolean((window as any).playground) + ); + + const output = await website.page.evaluate(async () => { + const playground = (window as any).playground; + await playground.writeFile('/tmp/script.php', "