Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ This update provides much improved schema interfaces. The migration should be mi
- Allow for more customization of error messages: https://github.com/agentclientprotocol/typescript-sdk/pull/12
- Update to latest ACP JSON Schema: https://github.com/agentclientprotocol/typescript-sdk/pull/10

## 0.4.9 (20205-10-21)
## 0.4.9 (2025-10-21)

- Fix: incorrect method for session/set_model client implementation.

Expand Down
2 changes: 1 addition & 1 deletion scripts/generate.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env node

import { createClient, defineConfig } from "@hey-api/openapi-ts";
import { createClient } from "@hey-api/openapi-ts";
import * as fs from "fs/promises";
import { dirname } from "path";
import * as prettier from "prettier";
Expand Down
151 changes: 151 additions & 0 deletions src/stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import { describe, it, expect } from "vitest";
import { ndJsonStream } from "./stream.js";
import type { AnyMessage } from "./jsonrpc.js";

function readableFromChunks(chunks: Uint8Array[]): ReadableStream<Uint8Array> {
return new ReadableStream({
start(controller) {
for (const chunk of chunks) {
controller.enqueue(chunk);
}
controller.close();
},
});
}

async function collectMessages(
readable: ReadableStream<AnyMessage>,
): Promise<AnyMessage[]> {
const messages: AnyMessage[] = [];
const reader = readable.getReader();
while (true) {
const { value, done } = await reader.read();
if (done) break;
messages.push(value);
}
return messages;
}

describe("ndJsonStream", () => {
const nullWritable = new WritableStream<Uint8Array>();

it("parses a single message", async () => {
const msg = { jsonrpc: "2.0" as const, id: 1, method: "test" };
const input = readableFromChunks([
new TextEncoder().encode(JSON.stringify(msg) + "\n"),
]);

const { readable } = ndJsonStream(nullWritable, input);
const messages = await collectMessages(readable);

expect(messages).toEqual([msg]);
});

it("parses multiple messages", async () => {
const msg1 = { jsonrpc: "2.0" as const, id: 1, method: "first" };
const msg2 = { jsonrpc: "2.0" as const, id: 2, method: "second" };
const input = readableFromChunks([
new TextEncoder().encode(
JSON.stringify(msg1) + "\n" + JSON.stringify(msg2) + "\n",
),
]);

const { readable } = ndJsonStream(nullWritable, input);
const messages = await collectMessages(readable);

expect(messages).toEqual([msg1, msg2]);
});

it("parses a message split across chunks", async () => {
const msg = { jsonrpc: "2.0" as const, id: 1, method: "split" };
const full = JSON.stringify(msg) + "\n";
const mid = Math.floor(full.length / 2);
const encoder = new TextEncoder();

const input = readableFromChunks([
encoder.encode(full.slice(0, mid)),
encoder.encode(full.slice(mid)),
]);

const { readable } = ndJsonStream(nullWritable, input);
const messages = await collectMessages(readable);

expect(messages).toEqual([msg]);
});

it("handles multi-byte UTF-8 characters split across chunks", async () => {
const msg = {
jsonrpc: "2.0" as const,
id: 1,
method: "test",
params: { text: "héllo wörld" },
};
const bytes = new TextEncoder().encode(JSON.stringify(msg) + "\n");

// Find the byte offset of 'é' (0xC3 0xA9) and split between its two bytes
const éOffset = bytes.indexOf(0xc3);
expect(éOffset).toBeGreaterThan(0);

const input = readableFromChunks([
bytes.slice(0, éOffset + 1), // includes 0xC3 but not 0xA9
bytes.slice(éOffset + 1), // starts with 0xA9
]);

const { readable } = ndJsonStream(nullWritable, input);
const messages = await collectMessages(readable);

expect(messages).toEqual([msg]);
});

it("parses a final message without trailing newline", async () => {
const msg = { jsonrpc: "2.0" as const, id: 1, method: "unterminated" };
const input = readableFromChunks([
new TextEncoder().encode(JSON.stringify(msg)), // no \n
]);

const { readable } = ndJsonStream(nullWritable, input);
const messages = await collectMessages(readable);

expect(messages).toEqual([msg]);
});

it("parses a final message without trailing newline with multi-byte chars split across chunks", async () => {
const msg = {
jsonrpc: "2.0" as const,
id: 1,
method: "tëst",
};
const bytes = new TextEncoder().encode(JSON.stringify(msg)); // no \n
const éOffset = bytes.indexOf(0xc3);
expect(éOffset).toBeGreaterThan(0);

const input = readableFromChunks([
bytes.slice(0, éOffset + 1), // includes 0xC3 but not 0xAB
bytes.slice(éOffset + 1),
]);

const { readable } = ndJsonStream(nullWritable, input);
const messages = await collectMessages(readable);

expect(messages).toEqual([msg]);
});

it("skips malformed lines and continues parsing", async () => {
const msg1 = { jsonrpc: "2.0" as const, id: 1, method: "before" };
const msg2 = { jsonrpc: "2.0" as const, id: 2, method: "after" };
const input = readableFromChunks([
new TextEncoder().encode(
JSON.stringify(msg1) +
"\n" +
"not valid json\n" +
JSON.stringify(msg2) +
"\n",
),
]);

const { readable } = ndJsonStream(nullWritable, input);
const messages = await collectMessages(readable);

expect(messages).toEqual([msg1, msg2]);
});
});
10 changes: 10 additions & 0 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export function ndJsonStream(
while (true) {
const { value, done } = await reader.read();
if (done) {
content += textDecoder.decode();
break;
}
if (!value) {
Expand All @@ -63,6 +64,15 @@ export function ndJsonStream(
}
}
}
const trimmedLine = content.trim();
if (trimmedLine) {
try {
const message = JSON.parse(trimmedLine) as AnyMessage;
controller.enqueue(message);
} catch (err) {
console.error("Failed to parse JSON message:", trimmedLine, err);
}
}
} catch (err) {
controller.error(err);
return;
Expand Down
Loading