From a58d9690286f80c71d004a4aa895cac4977f1dc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= Date: Fri, 26 Sep 2025 15:37:48 +0100 Subject: [PATCH 1/4] [Client] Enable calling playground.runStream() and .cli() from the client --- packages/php-wasm/universal/src/lib/api.ts | 306 ++++++++++++++++++++- 1 file changed, 305 insertions(+), 1 deletion(-) diff --git a/packages/php-wasm/universal/src/lib/api.ts b/packages/php-wasm/universal/src/lib/api.ts index 345553e7a2..4e221d4a9a 100644 --- a/packages/php-wasm/universal/src/lib/api.ts +++ b/packages/php-wasm/universal/src/lib/api.ts @@ -1,5 +1,5 @@ import type { PHPResponseData } from './php-response'; -import { PHPResponse } from './php-response'; +import { PHPResponse, StreamedPHPResponse } from './php-response'; import * as Comlink from './comlink-sync'; import { NodeSABSyncReceiveMessageTransport, @@ -240,6 +240,310 @@ function setupTransferHandlers() { } return serialized; }; + + Comlink.transferHandlers.set('StreamedPHPResponse', { + canHandle: (obj: unknown): obj is StreamedPHPResponse => + obj instanceof StreamedPHPResponse, + serialize(obj: StreamedPHPResponse): [any, Transferable[]] { + const supportsStreams = supportsTransferableStreams(); + const exitCodePort = promiseToPort(obj.exitCode); + if (supportsStreams) { + const payload = { + __type: 'StreamedPHPResponse', + headers: (obj as any)['headersStream'], + stdout: obj.stdout, + stderr: obj.stderr, + exitCodePort, + }; + return [payload, [exitCodePort]]; + } + // Fallback: bridge streams via MessagePorts + const headersPort = streamToPort((obj as any)['headersStream']); + const stdoutPort = streamToPort(obj.stdout); + const stderrPort = streamToPort(obj.stderr); + const payload = { + __type: 'StreamedPHPResponse', + headersPort, + stdoutPort, + stderrPort, + exitCodePort, + }; + return [ + payload, + [headersPort, stdoutPort, stderrPort, exitCodePort], + ]; + }, + deserialize(data: any): StreamedPHPResponse { + if (data.headers && data.stdout && data.stderr) { + const exitCode = portToPromise( + data.exitCodePort as MessagePort + ); + return new StreamedPHPResponse( + data.headers as ReadableStream, + data.stdout as ReadableStream, + data.stderr as ReadableStream, + exitCode + ); + } + const headers = portToStream(data.headersPort as MessagePort); + const stdout = portToStream(data.stdoutPort as MessagePort); + const stderr = portToStream(data.stderrPort as MessagePort); + const exitCode = portToPromise(data.exitCodePort as MessagePort); + return new StreamedPHPResponse(headers, stdout, stderr, exitCode); + }, + }); +} + +// Utilities for transferring ReadableStreams and Promises via MessagePorts: + +/** + * Safari does not support transferable streams, so we need to fallback to + * MessagePorts. + * Feature-detects whether this runtime supports transferring ReadableStreams + * directly through postMessage (aka "transferable streams"). When false, + * we must fall back to port-bridged streaming. + */ +function supportsTransferableStreams(): boolean { + try { + if (typeof ReadableStream === 'undefined') return false; + const { port1 } = new MessageChannel(); + const rs = new ReadableStream(); + port1.postMessage(rs as any); + try { + port1.close(); + } catch (_e) { + void _e; + } + return true; + } catch (_e) { + void _e; + return false; + } +} + +/** + * Bridges a ReadableStream to a MessagePort by reading chunks and posting + * messages to the port. Used as a fallback when transferable streams are not + * supported (e.g., Safari). + * + * Protocol of the returned MessagePort: + * + * { t: 'chunk', b: ArrayBuffer } – next binary chunk + * { t: 'close' } – end of stream + * { t: 'error', m: string } – terminal error + */ +function streamToPort(stream: ReadableStream): MessagePort { + const { port1, port2 } = new MessageChannel(); + (async () => { + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) { + try { + port1.postMessage({ t: 'close' }); + } catch { + // Ignore + } + try { + port1.close(); + } catch { + // Ignore + } + break; + } + if (value) { + // Ensure we transfer an owned buffer + const owned = + value.byteOffset === 0 && + value.byteLength === value.buffer.byteLength + ? value + : value.slice(); + const buf = owned.buffer; + try { + port1.postMessage({ t: 'chunk', b: buf }, [ + buf as unknown as Transferable, + ]); + } catch { + port1.postMessage({ + t: 'chunk', + b: owned.buffer.slice(0), + }); + } + } + } + } catch (e: any) { + try { + port1.postMessage({ t: 'error', m: e?.message || String(e) }); + } catch { + // Ignore + } + } finally { + try { + port1.close(); + } catch { + // Ignore + } + } + })(); + return port2; +} + +/** + * Reconstructs a ReadableStream from a MessagePort using the inverse of the + * streamToPort protocol. Each message enqueues data, closes, or errors. + */ +function portToStream(port: MessagePort): ReadableStream { + return new ReadableStream({ + start(controller) { + const onMessage = (ev: MessageEvent) => { + const data: any = (ev as any).data; + if (!data) return; + switch (data.t) { + case 'chunk': + controller.enqueue(new Uint8Array(data.b)); + break; + case 'close': + controller.close(); + cleanup(); + break; + case 'error': + controller.error(new Error(data.m || 'Stream error')); + cleanup(); + break; + } + }; + const cleanup = () => { + try { + (port as any).removeEventListener?.( + 'message', + onMessage as any + ); + } catch { + // Ignore + } + try { + (port as any).onmessage = null; + } catch { + // Ignore + } + try { + port.close(); + } catch { + // Ignore + } + }; + if ((port as any).addEventListener) { + (port as any).addEventListener('message', onMessage as any); + } else if ((port as any).on) { + (port as any).on('message', (data: any) => + onMessage({ data } as any) + ); + } else { + (port as any).onmessage = onMessage as any; + } + if (typeof (port as any).start === 'function') { + (port as any).start(); + } + }, + cancel() { + try { + port.close(); + } catch { + // Ignore + } + }, + }); +} + +/** + * Bridges a Promise to a MessagePort so it can be delivered across threads. + * + * Protocol of the returned MessagePort: + * + * { t: 'resolve', v: any } – promise resolved with value v + * { t: 'reject', m: str } – promise rejected with message m + */ +function promiseToPort(promise: Promise): MessagePort { + const { port1, port2 } = new MessageChannel(); + promise + .then((value) => { + try { + port1.postMessage({ t: 'resolve', v: value }); + } catch { + // Ignore + } + }) + .catch((err) => { + try { + port1.postMessage({ + t: 'reject', + m: (err as any)?.message || String(err), + }); + } catch { + // Ignore + } + }) + .finally(() => { + try { + port1.close(); + } catch { + // Ignore + } + }); + return port2; +} + +/** + * Reconstructs a Promise from a MessagePort using the inverse of + * promiseToPort. Resolves or rejects when the corresponding message arrives. + */ +function portToPromise(port: MessagePort): Promise { + return new Promise((resolve, reject) => { + const onMessage = (ev: MessageEvent) => { + const data: any = (ev as any).data; + if (!data) return; + if (data.t === 'resolve') { + cleanup(); + resolve(data.v); + } else if (data.t === 'reject') { + cleanup(); + reject(new Error(data.m || '')); + } + }; + const cleanup = () => { + try { + (port as any).removeEventListener?.( + 'message', + onMessage as any + ); + } catch { + // Ignore + } + try { + (port as any).onmessage = null; + } catch { + // Ignore + } + try { + port.close(); + } catch { + // Ignore + } + }; + if ((port as any).addEventListener) { + (port as any).addEventListener('message', onMessage as any); + } else if ((port as any).on) { + (port as any).on('message', (data: any) => + onMessage({ data } as any) + ); + } else { + (port as any).onmessage = onMessage as any; + } + if (typeof (port as any).start === 'function') { + (port as any).start(); + } + }); } // Augment Comlink's throw handler to include all the information carried by From c4412eb9d91a3f0753b160a67fd1381be7d5d2ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= Date: Fri, 26 Sep 2025 15:43:26 +0100 Subject: [PATCH 2/4] format --- packages/php-wasm/universal/src/lib/api.ts | 62 ++++++++++------------ 1 file changed, 28 insertions(+), 34 deletions(-) diff --git a/packages/php-wasm/universal/src/lib/api.ts b/packages/php-wasm/universal/src/lib/api.ts index 4e221d4a9a..4112c2a544 100644 --- a/packages/php-wasm/universal/src/lib/api.ts +++ b/packages/php-wasm/universal/src/lib/api.ts @@ -343,12 +343,12 @@ function streamToPort(stream: ReadableStream): MessagePort { try { port1.postMessage({ t: 'close' }); } catch { - // Ignore + // Ignore error } try { port1.close(); } catch { - // Ignore + // Ignore error } break; } @@ -376,13 +376,13 @@ function streamToPort(stream: ReadableStream): MessagePort { try { port1.postMessage({ t: 'error', m: e?.message || String(e) }); } catch { - // Ignore + // Ignore error } } finally { try { port1.close(); } catch { - // Ignore + // Ignore error } } })(); @@ -415,42 +415,39 @@ function portToStream(port: MessagePort): ReadableStream { }; const cleanup = () => { try { - (port as any).removeEventListener?.( - 'message', - onMessage as any - ); + port.removeEventListener?.('message', onMessage as any); } catch { - // Ignore + // Ignore error } try { - (port as any).onmessage = null; + port.onmessage = null; } catch { - // Ignore + // Ignore error } try { port.close(); } catch { - // Ignore + // Ignore error } }; - if ((port as any).addEventListener) { - (port as any).addEventListener('message', onMessage as any); + if (port.addEventListener) { + port.addEventListener('message', onMessage as any); } else if ((port as any).on) { (port as any).on('message', (data: any) => onMessage({ data } as any) ); } else { - (port as any).onmessage = onMessage as any; + port.onmessage = onMessage as any; } - if (typeof (port as any).start === 'function') { - (port as any).start(); + if (typeof port.start === 'function') { + port.start(); } }, cancel() { try { port.close(); } catch { - // Ignore + // Ignore error } }, }); @@ -471,7 +468,7 @@ function promiseToPort(promise: Promise): MessagePort { try { port1.postMessage({ t: 'resolve', v: value }); } catch { - // Ignore + // Ignore error } }) .catch((err) => { @@ -481,14 +478,14 @@ function promiseToPort(promise: Promise): MessagePort { m: (err as any)?.message || String(err), }); } catch { - // Ignore + // Ignore error } }) .finally(() => { try { port1.close(); } catch { - // Ignore + // Ignore error } }); return port2; @@ -513,35 +510,32 @@ function portToPromise(port: MessagePort): Promise { }; const cleanup = () => { try { - (port as any).removeEventListener?.( - 'message', - onMessage as any - ); + port.removeEventListener?.('message', onMessage as any); } catch { - // Ignore + // Ignore error } try { - (port as any).onmessage = null; + port.onmessage = null; } catch { - // Ignore + // Ignore error } try { port.close(); } catch { - // Ignore + // Ignore error } }; - if ((port as any).addEventListener) { - (port as any).addEventListener('message', onMessage as any); + if (port.addEventListener) { + port.addEventListener('message', onMessage as any); } else if ((port as any).on) { (port as any).on('message', (data: any) => onMessage({ data } as any) ); } else { - (port as any).onmessage = onMessage as any; + port.onmessage = onMessage as any; } - if (typeof (port as any).start === 'function') { - (port as any).start(); + if (typeof port.start === 'function') { + port.start(); } }); } From e4cb9729f623aea074bd173e9454dbfa25240a94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Zieli=C5=84ski?= Date: Fri, 26 Sep 2025 16:20:52 +0100 Subject: [PATCH 3/4] Add e2e tess --- .../website/playwright/e2e/client.spec.ts | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 packages/playground/website/playwright/e2e/client.spec.ts diff --git a/packages/playground/website/playwright/e2e/client.spec.ts b/packages/playground/website/playwright/e2e/client.spec.ts new file mode 100644 index 0000000000..b2a4d71c5f --- /dev/null +++ b/packages/playground/website/playwright/e2e/client.spec.ts @@ -0,0 +1,36 @@ +import { test, expect } from '../playground-fixtures.ts'; + +test('playground.cli() streams stdout', async ({ website }) => { + await website.goto('./'); + // Ensure the Playground client is connected and exposed on window + await website.page.waitForFunction(() => + Boolean((window as any).playground) + ); + + const output = await website.page.evaluate(async () => { + const playground = (window as any).playground; + await playground.writeFile('/tmp/script.php', " { + await website.goto('./'); + // Ensure the Playground client is connected and exposed on window + await website.page.waitForFunction(() => + Boolean((window as any).playground) + ); + + const output = await website.page.evaluate(async () => { + const playground = (window as any).playground; + const streamed = await playground.__internal_getPHP().runStream({ + code: " Date: Fri, 26 Sep 2025 17:24:10 +0100 Subject: [PATCH 4/4] remove runStream() test --- .../website/playwright/e2e/client.spec.ts | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/packages/playground/website/playwright/e2e/client.spec.ts b/packages/playground/website/playwright/e2e/client.spec.ts index b2a4d71c5f..384a83582b 100644 --- a/packages/playground/website/playwright/e2e/client.spec.ts +++ b/packages/playground/website/playwright/e2e/client.spec.ts @@ -16,21 +16,3 @@ test('playground.cli() streams stdout', async ({ website }) => { await expect(output).toContain('hi!'); }); - -test('playground.runStream() streams stdout', async ({ website }) => { - await website.goto('./'); - // Ensure the Playground client is connected and exposed on window - await website.page.waitForFunction(() => - Boolean((window as any).playground) - ); - - const output = await website.page.evaluate(async () => { - const playground = (window as any).playground; - const streamed = await playground.__internal_getPHP().runStream({ - code: "