Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 299 additions & 1 deletion packages/php-wasm/universal/src/lib/api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { PHPResponseData } from './php-response';
import { PHPResponse } from './php-response';
import { PHPResponse, StreamedPHPResponse } from './php-response';
import * as Comlink from './comlink-sync';
import {
NodeSABSyncReceiveMessageTransport,
Expand Down Expand Up @@ -240,6 +240,304 @@ function setupTransferHandlers() {
}
return serialized;
};

Comlink.transferHandlers.set('StreamedPHPResponse', {
canHandle: (obj: unknown): obj is StreamedPHPResponse =>
obj instanceof StreamedPHPResponse,
serialize(obj: StreamedPHPResponse): [any, Transferable[]] {
const supportsStreams = supportsTransferableStreams();
const exitCodePort = promiseToPort(obj.exitCode);
if (supportsStreams) {
const payload = {
__type: 'StreamedPHPResponse',
headers: (obj as any)['headersStream'],
stdout: obj.stdout,
stderr: obj.stderr,
exitCodePort,
};
return [payload, [exitCodePort]];
}
// Fallback: bridge streams via MessagePorts
const headersPort = streamToPort((obj as any)['headersStream']);
const stdoutPort = streamToPort(obj.stdout);
const stderrPort = streamToPort(obj.stderr);
const payload = {
__type: 'StreamedPHPResponse',
headersPort,
stdoutPort,
stderrPort,
exitCodePort,
};
return [
payload,
[headersPort, stdoutPort, stderrPort, exitCodePort],
];
},
deserialize(data: any): StreamedPHPResponse {
if (data.headers && data.stdout && data.stderr) {
const exitCode = portToPromise(
data.exitCodePort as MessagePort
);
return new StreamedPHPResponse(
data.headers as ReadableStream<Uint8Array>,
data.stdout as ReadableStream<Uint8Array>,
data.stderr as ReadableStream<Uint8Array>,
exitCode
);
}
const headers = portToStream(data.headersPort as MessagePort);
const stdout = portToStream(data.stdoutPort as MessagePort);
const stderr = portToStream(data.stderrPort as MessagePort);
const exitCode = portToPromise(data.exitCodePort as MessagePort);
return new StreamedPHPResponse(headers, stdout, stderr, exitCode);
},
});
}

// Utilities for transferring ReadableStreams and Promises via MessagePorts:

/**
* Safari does not support transferable streams, so we need to fallback to
* MessagePorts.
* Feature-detects whether this runtime supports transferring ReadableStreams
* directly through postMessage (aka "transferable streams"). When false,
* we must fall back to port-bridged streaming.
*/
function supportsTransferableStreams(): boolean {
try {
if (typeof ReadableStream === 'undefined') return false;
const { port1 } = new MessageChannel();
const rs = new ReadableStream();
port1.postMessage(rs as any);
try {
port1.close();
} catch (_e) {
void _e;
}
return true;
} catch (_e) {
void _e;
return false;
}
}

/**
* Bridges a ReadableStream to a MessagePort by reading chunks and posting
* messages to the port. Used as a fallback when transferable streams are not
* supported (e.g., Safari).
*
* Protocol of the returned MessagePort:
*
* { t: 'chunk', b: ArrayBuffer } – next binary chunk
* { t: 'close' } – end of stream
* { t: 'error', m: string } – terminal error
*/
function streamToPort(stream: ReadableStream<Uint8Array>): MessagePort {
const { port1, port2 } = new MessageChannel();
(async () => {
const reader = stream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
try {
port1.postMessage({ t: 'close' });
} catch {
// Ignore error
}
try {
port1.close();
} catch {
// Ignore error
}
break;
}
if (value) {
// Ensure we transfer an owned buffer
const owned =
value.byteOffset === 0 &&
value.byteLength === value.buffer.byteLength
? value
: value.slice();
const buf = owned.buffer;
try {
port1.postMessage({ t: 'chunk', b: buf }, [
buf as unknown as Transferable,
]);
} catch {
port1.postMessage({
t: 'chunk',
b: owned.buffer.slice(0),
});
}
}
}
} catch (e: any) {
try {
port1.postMessage({ t: 'error', m: e?.message || String(e) });
} catch {
// Ignore error
}
} finally {
try {
port1.close();
} catch {
// Ignore error
}
}
})();
return port2;
}

/**
* Reconstructs a ReadableStream from a MessagePort using the inverse of the
* streamToPort protocol. Each message enqueues data, closes, or errors.
*/
function portToStream(port: MessagePort): ReadableStream<Uint8Array> {
return new ReadableStream<Uint8Array>({
start(controller) {
const onMessage = (ev: MessageEvent) => {
const data: any = (ev as any).data;
if (!data) return;
switch (data.t) {
case 'chunk':
controller.enqueue(new Uint8Array(data.b));
break;
case 'close':
controller.close();
cleanup();
break;
case 'error':
controller.error(new Error(data.m || 'Stream error'));
cleanup();
break;
}
};
const cleanup = () => {
try {
port.removeEventListener?.('message', onMessage as any);
} catch {
// Ignore error
}
try {
port.onmessage = null;
} catch {
// Ignore error
}
try {
port.close();
} catch {
// Ignore error
}
};
if (port.addEventListener) {
port.addEventListener('message', onMessage as any);
} else if ((port as any).on) {
(port as any).on('message', (data: any) =>
onMessage({ data } as any)
);
} else {
port.onmessage = onMessage as any;
}
if (typeof port.start === 'function') {
port.start();
}
},
cancel() {
try {
port.close();
} catch {
// Ignore error
}
},
});
}

/**
* Bridges a Promise to a MessagePort so it can be delivered across threads.
*
* Protocol of the returned MessagePort:
*
* { t: 'resolve', v: any } – promise resolved with value v
* { t: 'reject', m: str } – promise rejected with message m
*/
function promiseToPort(promise: Promise<any>): MessagePort {
const { port1, port2 } = new MessageChannel();
promise
.then((value) => {
try {
port1.postMessage({ t: 'resolve', v: value });
} catch {
// Ignore error
}
})
.catch((err) => {
try {
port1.postMessage({
t: 'reject',
m: (err as any)?.message || String(err),
});
} catch {
// Ignore error
}
})
.finally(() => {
try {
port1.close();
} catch {
// Ignore error
}
});
return port2;
}

/**
* Reconstructs a Promise from a MessagePort using the inverse of
* promiseToPort. Resolves or rejects when the corresponding message arrives.
*/
function portToPromise(port: MessagePort): Promise<any> {
return new Promise((resolve, reject) => {
const onMessage = (ev: MessageEvent) => {
const data: any = (ev as any).data;
if (!data) return;
if (data.t === 'resolve') {
cleanup();
resolve(data.v);
} else if (data.t === 'reject') {
cleanup();
reject(new Error(data.m || ''));
}
};
const cleanup = () => {
try {
port.removeEventListener?.('message', onMessage as any);
} catch {
// Ignore error
}
try {
port.onmessage = null;
} catch {
// Ignore error
}
try {
port.close();
} catch {
// Ignore error
}
};
if (port.addEventListener) {
port.addEventListener('message', onMessage as any);
} else if ((port as any).on) {
(port as any).on('message', (data: any) =>
onMessage({ data } as any)
);
} else {
port.onmessage = onMessage as any;
}
if (typeof port.start === 'function') {
port.start();
}
});
}

// Augment Comlink's throw handler to include all the information carried by
Expand Down
18 changes: 18 additions & 0 deletions packages/playground/website/playwright/e2e/client.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { test, expect } from '../playground-fixtures.ts';

test('playground.cli() streams stdout', async ({ website }) => {
await website.goto('./');
// Ensure the Playground client is connected and exposed on window
await website.page.waitForFunction(() =>
Boolean((window as any).playground)
);

const output = await website.page.evaluate(async () => {
const playground = (window as any).playground;
await playground.writeFile('/tmp/script.php', "<?php echo 'hi!'; ");
const response = await playground.cli(['php', '/tmp/script.php']);
return await response.stdoutText;
});

await expect(output).toContain('hi!');
});
Loading