From 60ac84416f8797ba85648c4cac6b3b6341ebd98c Mon Sep 17 00:00:00 2001 From: umair Date: Thu, 19 Mar 2026 20:39:36 +0000 Subject: [PATCH 1/6] Simplify SDK log handler and fix error hint formatting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reduce SDK log handler to only surface messages in --verbose mode — most SDK errors already propagate via promise rejections or state changes handled by fail()/setupChannelStateLogging(). Skip closing/closed connection states (CLI-initiated teardown). Strip hint newlines in JSON output. Migrate hint strings to single quotes for clean JSON. Add error hint for 93002 (mutableMessages). --- src/base-command.ts | 79 ++++++++++++++++----------------------------- src/utils/errors.ts | 14 ++++---- 2 files changed, 36 insertions(+), 57 deletions(-) diff --git a/src/base-command.ts b/src/base-command.ts index 27ac76a7..87f7fa92 100644 --- a/src/base-command.ts +++ b/src/base-command.ts @@ -872,7 +872,6 @@ export abstract class AblyBaseCommand extends InteractiveBaseCommand { protected getClientOptions(flags: BaseFlags): Ably.ClientOptions { const options: Ably.ClientOptions = {}; - const isJsonMode = this.shouldOutputJson(flags); // Handle authentication: ABLY_TOKEN env → flags["api-key"] (set by ensureAppAndKey) → ABLY_API_KEY env → config if (process.env.ABLY_TOKEN) { @@ -920,54 +919,22 @@ export abstract class AblyBaseCommand extends InteractiveBaseCommand { options.tls = flags.tls === "true"; } - // Always add a log handler to control SDK output formatting and destination + // SDK log handler: only surface logs in --verbose mode. + // Most SDK errors (NACKs, auth, connection) also propagate via promise + // rejections or state changes, so fail() / setupChannelStateLogging() + // handle them with structured output + actionable hints. + // A few SDK errors (presence/annotation decode failures) only surface here, + // but those are rare edge cases — users can use --verbose to diagnose them. options.logHandler = (message: string, level: number) => { - if (isJsonMode) { - // JSON Mode Handling - if (flags.verbose && level <= 2) { - // Verbose JSON: Log ALL SDK messages via logCliEvent - const logData = { sdkLogLevel: level, sdkMessage: message }; - this.logCliEvent( - flags, - "ablySdk", - `LogLevel-${level}`, - message, - logData, - ); - } else if (level <= 1) { - // Standard JSON: Log only SDK ERRORS (level <= 1) to stderr as JSON - const errorData = { - level, - logType: "sdkError", - message, - timestamp: new Date().toISOString(), - }; - // Log to stderr with standard JSON envelope for consistency - this.logToStderr( - this.formatJsonRecord(JsonRecordType.Log, errorData, flags), - ); - } - // If not verbose JSON and level > 1, suppress non-error SDK logs - } else { - // Non-JSON Mode Handling - if (flags.verbose && level <= 2) { - // Verbose Non-JSON: Log ALL SDK messages via logCliEvent (human-readable) - const logData = { sdkLogLevel: level, sdkMessage: message }; - // logCliEvent handles non-JSON formatting when verbose is true - this.logCliEvent( - flags, - "ablySdk", - `LogLevel-${level}`, - message, - logData, - ); - } else if (level <= 1) { - // SDK errors are handled by setupChannelStateLogging() and fail() - // Only show raw SDK errors in verbose mode (handled above) - // In non-verbose mode, log to stderr for debugging without polluting stdout - this.logToStderr(`${chalk.red.bold(`[AblySDK Error]`)} ${message}`); - } - // If not verbose non-JSON and level > 1, suppress non-error SDK logs + if (flags.verbose && level <= 2) { + const logData = { sdkLogLevel: level, sdkMessage: message }; + this.logCliEvent( + flags, + "ablySdk", + `LogLevel-${level}`, + message, + logData, + ); } }; @@ -1363,6 +1330,16 @@ export abstract class AblyBaseCommand extends InteractiveBaseCommand { const connectionStateHandler = ( stateChange: Ably.ConnectionStateChange, ) => { + // Skip closing/closed — these are CLI-initiated teardown, not actionable. + // The SDK attaches an _ErrorInfo reason even on normal close, which looks + // alarming in verbose output. + if ( + stateChange.current === "closing" || + stateChange.current === "closed" + ) { + return; + } + this.logCliEvent( flags, component, @@ -1379,7 +1356,7 @@ export abstract class AblyBaseCommand extends InteractiveBaseCommand { break; } case "disconnected": { - this.log(formatWarning("Disconnected from Ably")); + this.log(formatWarning("Disconnected from Ably.")); break; } case "failed": { @@ -1391,7 +1368,7 @@ export abstract class AblyBaseCommand extends InteractiveBaseCommand { break; } case "suspended": { - this.log(formatWarning("Connection suspended")); + this.log(formatWarning("Connection suspended.")); break; } case "connecting": { @@ -1567,7 +1544,7 @@ export abstract class AblyBaseCommand extends InteractiveBaseCommand { if (this.shouldOutputJson(flags)) { const jsonData = cmdError.toJsonData(); if (friendlyHint) { - jsonData.hint = friendlyHint; + jsonData.hint = friendlyHint.replaceAll("\n", " "); } this.log(this.formatJsonRecord(JsonRecordType.Error, jsonData, flags)); this.exit(1); diff --git a/src/utils/errors.ts b/src/utils/errors.ts index 757f9a50..e84c9256 100644 --- a/src/utils/errors.ts +++ b/src/utils/errors.ts @@ -11,20 +11,22 @@ export function errorMessage(error: unknown): string { */ const hints: Record = { 40101: - 'The credentials provided are not valid. Check your API key or token, or re-authenticate with "ably login".', - 40103: 'The token has expired. Please re-authenticate with "ably login".', + "The credentials provided are not valid. Check your API key or token, or re-authenticate with 'ably login'.", + 40103: "The token has expired. Please re-authenticate with 'ably login'.", 40110: - 'Unable to authorize. Check your authentication configuration or re-authenticate with "ably login".', + "Unable to authorize. Check your authentication configuration or re-authenticate with 'ably login'.", 40160: - 'Run "ably auth keys list" to check your key\'s capabilities for this resource, or update them in the Ably dashboard.', + "Run 'ably auth keys list' to check your key's capabilities for this resource, or update them in the Ably dashboard.", 40161: - 'Run "ably auth keys list" to check your key\'s publish capability, or update it in the Ably dashboard.', + "Run 'ably auth keys list' to check your key's publish capability, or update it in the Ably dashboard.", 40171: - 'Run "ably auth keys list" to check your key\'s capabilities, or update them in the Ably dashboard.', + "Run 'ably auth keys list' to check your key's capabilities, or update them in the Ably dashboard.", 40300: "This application has been disabled. Check the app status in the Ably dashboard at https://ably.com/dashboard", 80003: "The connection was lost. Check your network connection and try again.", + 93002: + "This channel requires mutableMessages to be enabled.\nRun 'ably apps rules list' to check your channel rules,\nor enable it with 'ably apps rules create' or 'ably apps rules update'.", }; export function getFriendlyAblyErrorHint(code?: number): string | undefined { From bc93575247ed58c1abddb12b2ca292c83f293554 Mon Sep 17 00:00:00 2001 From: umair Date: Thu, 19 Mar 2026 20:39:43 +0000 Subject: [PATCH 2/6] Add token streaming mode to channels publish Add --token-streaming flag that publishes an initial message then streams remaining text as appendMessage() calls over --stream-duration seconds. Includes text chunker utility for word-boundary-aware splitting and --token-size flag to control chunk granularity. Refactors publishWithRealtime to use setup*StateLogging helpers. --- src/commands/channels/publish.ts | 222 ++++++++++++++++-- src/utils/text-chunker.ts | 50 ++++ test/helpers/mock-ably-realtime.ts | 4 + test/unit/commands/channels/publish.test.ts | 246 ++++++++++++++++++++ test/unit/utils/text-chunker.test.ts | 106 +++++++++ 5 files changed, 609 insertions(+), 19 deletions(-) create mode 100644 src/utils/text-chunker.ts create mode 100644 test/unit/utils/text-chunker.test.ts diff --git a/src/commands/channels/publish.ts b/src/commands/channels/publish.ts index 2e78c2c6..d6fa0652 100644 --- a/src/commands/channels/publish.ts +++ b/src/commands/channels/publish.ts @@ -12,6 +12,7 @@ import { formatResource, formatSuccess, } from "../../utils/output.js"; +import { chunkText } from "../../utils/text-chunker.js"; export default class ChannelsPublish extends AblyBaseCommand { static override args = { @@ -38,12 +39,20 @@ export default class ChannelsPublish extends AblyBaseCommand { '$ ably channels publish my-channel "Hello World" --json', '$ ably channels publish my-channel "Hello World" --pretty-json', '$ ably channels publish my-channel \'{"data":"Push notification","extras":{"push":{"notification":{"title":"Hello","body":"World"}}}}\'', + '$ ably channels publish my-channel "The quick brown fox jumps over the lazy dog" --token-streaming --stream-duration 5', + '$ ably channels publish --name ai-response my-channel "The quick brown fox" --token-streaming', '$ ABLY_API_KEY="YOUR_API_KEY" ably channels publish my-channel \'{"data":"Simple message"}\'', ]; static override flags = { ...productApiFlags, ...clientIdFlag, + "token-size": Flags.integer({ + default: 4, + dependsOn: ["token-streaming"], + description: "Approximate characters per token", + min: 1, + }), count: Flags.integer({ char: "c", default: 1, @@ -63,6 +72,18 @@ export default class ChannelsPublish extends AblyBaseCommand { char: "n", description: "The event name (if not specified in the message JSON)", }), + "stream-duration": Flags.integer({ + default: 10, + dependsOn: ["token-streaming"], + description: "Total duration in seconds over which to stream tokens", + min: 1, + }), + "token-streaming": Flags.boolean({ + default: false, + description: + "Enable token streaming: publish initial message then stream remaining text as appends (message-per-response pattern)", + exclusive: ["transport"], + }), transport: Flags.string({ description: "Transport method to use for publishing (rest or realtime)", options: ["rest", "realtime"], @@ -70,7 +91,6 @@ export default class ChannelsPublish extends AblyBaseCommand { }; private progressIntervalId: NodeJS.Timeout | null = null; - private realtime: Ably.Realtime | null = null; // Override finally to ensure resources are cleaned up async finally(err: Error | undefined): Promise { @@ -88,6 +108,21 @@ export default class ChannelsPublish extends AblyBaseCommand { async run(): Promise { const { args, flags } = await this.parse(ChannelsPublish); + // Validate --token-streaming mutual exclusivity with --count > 1 + if (flags["token-streaming"] && flags.count > 1) { + this.fail( + "Cannot use --token-streaming with --count > 1", + flags, + "channelPublish", + ); + } + + // Stream text mode always uses realtime + if (flags["token-streaming"]) { + await this.publishTokenStream(args, flags); + return; + } + // Use Realtime transport by default when publishing multiple messages to ensure ordering // If transport is not explicitly set and count > 1, use realtime // If transport is explicitly set, respect that choice @@ -278,13 +313,13 @@ export default class ChannelsPublish extends AblyBaseCommand { ); } - private async publishWithRealtime( + private async publishTokenStream( args: Record, flags: Record, ): Promise { try { - this.realtime = await this.createAblyRealtimeClient(flags as BaseFlags); - if (!this.realtime) { + const client = await this.createAblyRealtimeClient(flags as BaseFlags); + if (!client) { this.fail( "Failed to create Ably client. Please check your API key and try again.", flags as BaseFlags, @@ -292,16 +327,171 @@ export default class ChannelsPublish extends AblyBaseCommand { ); } - const client = this.realtime; + this.setupConnectionStateLogging(client, flags as BaseFlags, { + includeUserFriendlyMessages: true, + }); - client.connection.on((stateChange: Ably.ConnectionStateChange) => { - this.logCliEvent( + const channel = client.channels.get(args.channel as string); + + this.setupChannelStateLogging(channel, flags as BaseFlags, { + includeUserFriendlyMessages: true, + }); + + // Get the text to stream + const message = prepareMessageFromInput( + args.message as string, + flags, + {}, + ); + const text = String(message.data ?? args.message); + const tokenSize = flags["token-size"] as number; + const tokens = chunkText(text, tokenSize); + + if (tokens.length === 0) { + this.fail("No text to stream", flags as BaseFlags, "channelPublish"); + } + + const streamDuration = flags["stream-duration"] as number; + + if (!this.shouldOutputJson(flags)) { + this.log( + formatProgress( + `Streaming ${tokens.length} tokens to channel ${formatResource(args.channel as string)} over ${streamDuration}s`, + ), + ); + } + + // Publish initial message + const firstToken = tokens[0]; + const publishResult = await channel.publish({ + data: firstToken, + ...(message.name ? { name: message.name } : {}), + } as Ably.Message); + const serial = publishResult?.serials?.[0]; + + if (!serial) { + this.fail( + "Publish did not return a serial — streaming appends require a serial", + flags as BaseFlags, + "channelPublish", + ); + } + + this.logCliEvent( + flags, + "publish", + "streamInitialPublished", + `Initial token published to channel ${args.channel}`, + { serial, token: firstToken, tokenIndex: 0 }, + ); + + if (this.shouldOutputJson(flags)) { + this.logJsonEvent( + { + message: { + action: "message.create", + serial, + channel: args.channel, + data: firstToken, + tokenIndex: 0, + totalTokens: tokens.length, + }, + }, flags, - "connection", - stateChange.current, - `Connection state changed to ${stateChange.current}`, - { reason: stateChange.reason }, ); + } else { + this.log( + formatSuccess( + `Initial token published (serial: ${formatResource(serial)}).`, + ), + ); + } + + // Stream remaining tokens as appends + if (tokens.length > 1) { + const remainingTokens = tokens.slice(1); + const delay = (streamDuration * 1000) / remainingTokens.length; + + for (let i = 0; i < remainingTokens.length; i++) { + await new Promise((resolve) => setTimeout(resolve, delay)); + + const token = remainingTokens[i]; + try { + // Cast needed: SDK appendMessage() expects full Message but only reads serial/data fields + await channel.appendMessage({ + serial, + data: token, + } as Ably.Message); + + this.logCliEvent( + flags, + "publish", + "streamAppendPublished", + `Append token ${i + 1} published to channel ${args.channel}`, + { serial, token, tokenIndex: i + 1 }, + ); + + if (this.shouldOutputJson(flags)) { + this.logJsonEvent( + { + message: { + action: "message.append", + serial, + channel: args.channel, + data: token, + tokenIndex: i + 1, + totalTokens: tokens.length, + }, + }, + flags, + ); + } + } catch (appendError) { + this.fail(appendError, flags as BaseFlags, "channelPublish"); + } + } + } + + // Final summary + const summaryData = { + stream: { + serial, + channel: args.channel, + totalTokens: tokens.length, + streamDuration, + }, + }; + + if (this.shouldOutputJson(flags)) { + this.logJsonResult(summaryData, flags); + } else { + this.log( + formatSuccess( + `Streamed ${tokens.length} tokens to channel ${formatResource(args.channel as string)} (serial: ${formatResource(serial)}).`, + ), + ); + } + } catch (error) { + this.fail(error, flags as BaseFlags, "channelPublish"); + } + } + + private async publishWithRealtime( + args: Record, + flags: Record, + ): Promise { + try { + const client = await this.createAblyRealtimeClient(flags as BaseFlags); + if (!client) { + this.fail( + "Failed to create Ably client. Please check your API key and try again.", + flags as BaseFlags, + "channelPublish", + ); + } + + this.setupConnectionStateLogging(client, flags as BaseFlags, { + includeUserFriendlyMessages: true, }); this.logCliEvent( @@ -312,14 +502,8 @@ export default class ChannelsPublish extends AblyBaseCommand { ); const channel = client.channels.get(args.channel as string); - channel.on((stateChange: Ably.ChannelStateChange) => { - this.logCliEvent( - flags, - "channel", - stateChange.current, - `Channel '${args.channel}' state changed to ${stateChange.current}`, - { reason: stateChange.reason }, - ); + this.setupChannelStateLogging(channel, flags as BaseFlags, { + includeUserFriendlyMessages: true, }); await this.publishMessages(args, flags, async (msg) => { diff --git a/src/utils/text-chunker.ts b/src/utils/text-chunker.ts new file mode 100644 index 00000000..528969fb --- /dev/null +++ b/src/utils/text-chunker.ts @@ -0,0 +1,50 @@ +/** + * Split text into chunks of approximately `chunkSize` characters. + * Prefers breaking at word boundaries (space, punctuation) when within 2 chars of target. + * Returns array of string chunks. + */ +export function chunkText(text: string, chunkSize: number): string[] { + if (!text) return []; + if (chunkSize <= 0) return [text]; + if (text.length <= chunkSize) return [text]; + + const chunks: string[] = []; + let start = 0; + + while (start < text.length) { + if (start + chunkSize >= text.length) { + chunks.push(text.slice(start)); + break; + } + + let end = start + chunkSize; + + // Look for a word boundary within 2 chars of the target + const searchStart = Math.max(start, end - 2); + const searchEnd = Math.min(text.length, end + 2); + let bestBreak = -1; + + for (let i = searchStart; i <= searchEnd; i++) { + if ( + text[i] === " " || + text[i] === "," || + text[i] === "." || + text[i] === ";" || + text[i] === "!" || + text[i] === "?" + ) { + bestBreak = i + 1; // Include the boundary character in the current chunk + break; + } + } + + if (bestBreak > start) { + end = bestBreak; + } + + chunks.push(text.slice(start, end)); + start = end; + } + + return chunks; +} diff --git a/test/helpers/mock-ably-realtime.ts b/test/helpers/mock-ably-realtime.ts index b4af9110..4653e369 100644 --- a/test/helpers/mock-ably-realtime.ts +++ b/test/helpers/mock-ably-realtime.ts @@ -47,6 +47,7 @@ export interface MockRealtimeChannel { subscribe: Mock; unsubscribe: Mock; publish: Mock; + appendMessage: Mock; history: Mock; attach: Mock; detach: Mock; @@ -261,6 +262,9 @@ function createMockChannel(name: string): MockRealtimeChannel { } }), publish: vi.fn().mockResolvedValue({ serials: ["mock-serial-001"] }), + appendMessage: vi + .fn() + .mockResolvedValue({ versionSerial: "mock-version-serial-append" }), history: vi.fn().mockResolvedValue({ items: [] }), attach: vi.fn().mockImplementation(async function ( this: MockRealtimeChannel, diff --git a/test/unit/commands/channels/publish.test.ts b/test/unit/commands/channels/publish.test.ts index e8b3db6c..97011530 100644 --- a/test/unit/commands/channels/publish.test.ts +++ b/test/unit/commands/channels/publish.test.ts @@ -22,6 +22,9 @@ describe("ChannelsPublish", function () { standardFlagTests("channels:publish", import.meta.url, [ "--json", "--transport", + "--token-streaming", + "--stream-duration", + "--token-size", ]); describe("functionality", function () { @@ -382,6 +385,249 @@ describe("ChannelsPublish", function () { }); }); + describe("token streaming mode", function () { + it("should publish text as streamed token appends with --token-streaming", async function () { + const realtimeMock = getMockAblyRealtime(); + const channel = realtimeMock.channels._getChannel("test-channel"); + + const { stdout } = await runCommand( + [ + "channels:publish", + "test-channel", + "HelloWorldTestData", + "--token-streaming", + "--stream-duration", + "1", + "--token-size", + "6", + ], + import.meta.url, + ); + + // Should have published initial message + expect(channel.publish).toHaveBeenCalledOnce(); + // Should have appended remaining tokens + expect(channel.appendMessage).toHaveBeenCalled(); + expect(stdout).toContain("Streamed"); + expect(stdout).toContain("tokens"); + }); + + it("should reject --token-streaming with --count > 1", async function () { + const { error } = await runCommand( + [ + "channels:publish", + "test-channel", + "HelloWorld", + "--token-streaming", + "--count", + "3", + ], + import.meta.url, + ); + + expect(error).toBeDefined(); + expect(error?.message).toContain("--token-streaming"); + expect(error?.message).toContain("--count"); + }); + + it("should show stream progress in human-readable output", async function () { + getMockAblyRealtime(); + + const { stdout } = await runCommand( + [ + "channels:publish", + "test-channel", + "StreamingDemoTextHere", + "--token-streaming", + "--stream-duration", + "1", + "--token-size", + "5", + ], + import.meta.url, + ); + + expect(stdout).toContain("Streaming"); + expect(stdout).toContain("tokens"); + expect(stdout).toContain("Initial token published"); + }); + + it("should stream text with spaces when passed as JSON data", async function () { + const realtimeMock = getMockAblyRealtime(); + const channel = realtimeMock.channels._getChannel("test-channel"); + + const { stdout } = await runCommand( + [ + "channels:publish", + "test-channel", + '{"data":"The quick brown fox jumps over"}', + "--token-streaming", + "--stream-duration", + "1", + "--token-size", + "6", + ], + import.meta.url, + ); + + // Should have published initial message + expect(channel.publish).toHaveBeenCalledOnce(); + // Text has spaces so should produce multiple tokens + expect(channel.appendMessage).toHaveBeenCalled(); + expect(stdout).toContain("Streamed"); + expect(stdout).toContain("tokens"); + + // Verify the initial publish contains part of the text + const publishArgs = channel.publish.mock.calls[0][0]; + expect(publishArgs.data).toBeTruthy(); + + // Verify appended tokens are part of the original text + const appendedChunks = channel.appendMessage.mock.calls.map( + (call: unknown[]) => (call[0] as { data: string }).data, + ); + const allText = publishArgs.data + appendedChunks.join(""); + expect(allText).toBe("The quick brown fox jumps over"); + }); + + it("should work with --token-streaming and explicit --count 1", async function () { + getMockAblyRealtime(); + + const { stdout, error } = await runCommand( + [ + "channels:publish", + "test-channel", + "HelloWorldTest", + "--token-streaming", + "--count", + "1", + "--stream-duration", + "1", + "--token-size", + "5", + ], + import.meta.url, + ); + + expect(error).toBeUndefined(); + expect(stdout).toContain("Streamed"); + expect(stdout).toContain("tokens"); + }); + + it("should report partial progress when appendMessage fails mid-stream", async function () { + const realtimeMock = getMockAblyRealtime(); + const channel = realtimeMock.channels._getChannel("test-channel"); + + let appendCallCount = 0; + channel.appendMessage.mockImplementation(async () => { + appendCallCount++; + if (appendCallCount >= 2) { + throw new Error("Network timeout"); + } + return { versionSerial: "mock-version" }; + }); + + const { error } = await runCommand( + [ + "channels:publish", + "test-channel", + '{"data":"The quick brown fox jumps over"}', + "--token-streaming", + "--stream-duration", + "1", + "--token-size", + "4", + ], + import.meta.url, + ); + + expect(error).toBeDefined(); + expect(error?.message).toContain("Network timeout"); + }); + + it("should take longer with a higher --stream-duration value", async function () { + getMockAblyRealtime(); + + const startTime = Date.now(); + await runCommand( + [ + "channels:publish", + "test-channel", + "HelloWorldTestData", + "--token-streaming", + "--stream-duration", + "2", + "--token-size", + "6", + ], + import.meta.url, + ); + const elapsed = Date.now() - startTime; + + // With --stream-duration 2, should take at least ~1.5s (accounting for overhead) + expect(elapsed).toBeGreaterThanOrEqual(1500); + }); + + it("should fail when initial publish returns no serial in streaming mode", async function () { + const realtimeMock = getMockAblyRealtime(); + const channel = realtimeMock.channels._getChannel("test-channel"); + channel.publish.mockResolvedValue({ serials: [] }); + + const { error } = await runCommand( + [ + "channels:publish", + "test-channel", + "HelloWorld", + "--token-streaming", + "--stream-duration", + "1", + ], + import.meta.url, + ); + + expect(error).toBeDefined(); + expect(error?.message).toContain("serial"); + }); + + it("should output JSON events per token with --token-streaming --json", async function () { + getMockAblyRealtime(); + + const { stdout } = await runCommand( + [ + "channels:publish", + "test-channel", + "HelloWorldText", + "--token-streaming", + "--stream-duration", + "1", + "--token-size", + "5", + "--json", + ], + import.meta.url, + ); + + // Parse NDJSON output + const lines = stdout + .trim() + .split("\n") + .filter((l: string) => l.trim()); + const records = lines.map((l: string) => JSON.parse(l)); + + // Should have events for each token plus a final result + const events = records.filter( + (r: Record) => r.type === "event", + ); + const results = records.filter( + (r: Record) => r.type === "result", + ); + + expect(events.length).toBeGreaterThanOrEqual(1); + expect(results.length).toBe(1); + expect(results[0]).toHaveProperty("stream.totalTokens"); + expect(results[0]).toHaveProperty("stream.serial"); + }); + }); + describe("extras and push data", function () { it("should include extras.push when provided in message data", async function () { const restMock = getMockAblyRest(); diff --git a/test/unit/utils/text-chunker.test.ts b/test/unit/utils/text-chunker.test.ts new file mode 100644 index 00000000..862066cf --- /dev/null +++ b/test/unit/utils/text-chunker.test.ts @@ -0,0 +1,106 @@ +import { describe, it, expect } from "vitest"; +import { chunkText } from "../../../src/utils/text-chunker.js"; + +describe("chunkText", () => { + describe("functionality", () => { + it("should split text into chunks of approximately the given size", () => { + const text = "The quick brown fox jumps over the lazy dog"; + const chunks = chunkText(text, 10); + + expect(chunks.length).toBeGreaterThan(1); + // All chunks together should equal original text + expect(chunks.join("")).toBe(text); + }); + + it("should prefer word boundaries", () => { + const text = "Hello world this is a test"; + const chunks = chunkText(text, 6); + + // Should break at space near chunk size + expect(chunks.length).toBeGreaterThan(1); + expect(chunks.join("")).toBe(text); + // First chunk should break at a word boundary + expect(chunks[0]).toMatch(/\s$/); + }); + + it("should handle text shorter than chunk size", () => { + const text = "Hi"; + const chunks = chunkText(text, 10); + + expect(chunks).toEqual(["Hi"]); + }); + + it("should handle empty string", () => { + const chunks = chunkText("", 5); + + expect(chunks).toEqual([]); + }); + + it("should handle single-word text", () => { + const text = "Supercalifragilisticexpialidocious"; + const chunks = chunkText(text, 10); + + expect(chunks.length).toBeGreaterThan(1); + expect(chunks.join("")).toBe(text); + }); + + it("should handle exact chunk size text", () => { + const text = "12345"; + const chunks = chunkText(text, 5); + + expect(chunks).toEqual(["12345"]); + }); + + it("should preserve spaces within chunks", () => { + const text = "Hello world, how are you today?"; + const chunks = chunkText(text, 8); + + // Verify all content including spaces is preserved + expect(chunks.join("")).toBe(text); + + // At least some chunks should contain spaces + const chunksWithSpaces = chunks.filter((c) => c.includes(" ")); + expect(chunksWithSpaces.length).toBeGreaterThan(0); + }); + + it("should produce chunks close to the target size", () => { + const text = "The quick brown fox jumps over the lazy dog and runs away"; + const chunks = chunkText(text, 8); + + for (const chunk of chunks) { + // Chunks should be within reasonable range of target (8 ± 4) + expect(chunk.length).toBeLessThanOrEqual(12); + } + expect(chunks.join("")).toBe(text); + }); + + it("should return the full text as a single chunk when chunkSize is 0", () => { + const text = "Hello world"; + const chunks = chunkText(text, 0); + + expect(chunks).toEqual(["Hello world"]); + }); + + it("should return the full text as a single chunk when chunkSize is negative", () => { + const text = "Hello world"; + const chunks = chunkText(text, -5); + + expect(chunks).toEqual(["Hello world"]); + }); + + it("should handle chunkSize of 1", () => { + const text = "Hello"; + const chunks = chunkText(text, 1); + + expect(chunks.length).toBeGreaterThan(1); + expect(chunks.join("")).toBe(text); + }); + + it("should handle very large chunkSize relative to text", () => { + const text = "Short"; + const chunks = chunkText(text, 10000); + + expect(chunks).toEqual(["Short"]); + }); + }); +}); From 9fd4364e01e7c8112dd423c4faa1a88c8fc2ed37 Mon Sep 17 00:00:00 2001 From: umair Date: Thu, 19 Mar 2026 20:39:50 +0000 Subject: [PATCH 3/6] Add token streaming mode to channels subscribe Add --token-streaming flag that accumulates message.append data for the same serial, displaying the growing response in-place on TTY terminals. Refactors message display into displayNormalMessage() and handleTokenStreamMessage() methods. Renames message 'event' field to 'name' in output helpers to align with SDK terminology. --- src/commands/channels/history.ts | 2 +- src/commands/channels/subscribe.ts | 279 +++++++++++--- src/utils/output.ts | 7 +- test/unit/commands/channels/history.test.ts | 2 +- test/unit/commands/channels/subscribe.test.ts | 361 +++++++++++++++++- 5 files changed, 588 insertions(+), 63 deletions(-) diff --git a/src/commands/channels/history.ts b/src/commands/channels/history.ts index ada6b603..39f62e3c 100644 --- a/src/commands/channels/history.ts +++ b/src/commands/channels/history.ts @@ -132,7 +132,7 @@ export default class ChannelsHistory extends AblyBaseCommand { channel: channelName, clientId: message.clientId, data: message.data, - event: message.name || "(none)", + ...(message.name ? { name: message.name } : {}), id: message.id, indexPrefix: `${formatIndex(index + 1)} ${formatTimestamp(formatMessageTimestamp(ts))}`, serial: message.serial, diff --git a/src/commands/channels/subscribe.ts b/src/commands/channels/subscribe.ts index 7990b87a..365331bd 100644 --- a/src/commands/channels/subscribe.ts +++ b/src/commands/channels/subscribe.ts @@ -1,5 +1,6 @@ import { Args, Flags } from "@oclif/core"; import * as Ably from "ably"; +import chalk from "chalk"; import { AblyBaseCommand } from "../../base-command.js"; import { clientIdFlag, @@ -8,6 +9,7 @@ import { rewindFlag, } from "../../flags.js"; import { + formatEventType, formatListening, formatProgress, formatResource, @@ -39,6 +41,7 @@ export default class ChannelsSubscribe extends AblyBaseCommand { "$ ably channels subscribe my-channel --json", "$ ably channels subscribe my-channel --pretty-json", "$ ably channels subscribe my-channel --duration 30", + "$ ably channels subscribe --token-streaming my-channel", '$ ABLY_API_KEY="YOUR_API_KEY" ably channels subscribe my-channel', ]; @@ -70,12 +73,20 @@ export default class ChannelsSubscribe extends AblyBaseCommand { default: false, description: "Include sequence numbers in output", }), + "token-streaming": Flags.boolean({ + default: false, + description: + "Enable token streaming mode: accumulates message.append data for the same serial, displaying the growing response in-place (requires message interactions enabled on the channel)", + }), }; static override strict = false; private client: Ably.Realtime | null = null; private sequenceCounter = 0; + private tokenStreamAccumulatedData = ""; + private tokenStreamAppendCount = 0; + private tokenStreamSerial: string | null = null; async run(): Promise { const parseResult = await this.parse(ChannelsSubscribe); @@ -180,65 +191,14 @@ export default class ChannelsSubscribe extends AblyBaseCommand { // Subscribe and collect promise (rejects on capability/auth errors) const subscribePromise = channel.subscribe((message: Ably.Message) => { this.sequenceCounter++; - const timestamp = formatMessageTimestamp(message.timestamp); - const messageData = { - id: message.id, - timestamp, - channel: channel.name, - event: message.name || "(none)", - clientId: message.clientId, - connectionId: message.connectionId, - data: message.data, - encoding: message.encoding, - action: - message.action === undefined ? undefined : String(message.action), - serial: message.serial, - version: message.version, - annotations: message.annotations, - ...(flags["sequence-numbers"] - ? { sequence: this.sequenceCounter } - : {}), - }; - this.logCliEvent( - flags, - "subscribe", - "messageReceived", - `Received message on channel ${channel.name}`, - messageData, - ); - if (this.shouldOutputJson(flags)) { - this.logJsonEvent( - { - message: messageData, - ...(flags["sequence-numbers"] - ? { sequence: this.sequenceCounter } - : {}), - }, - flags, - ); - } else { - const msgFields: MessageDisplayFields = { - action: - message.action === undefined - ? undefined - : String(message.action), - channel: channel.name, - clientId: message.clientId, - data: message.data, - event: message.name || "(none)", - id: message.id, - serial: message.serial, - timestamp: message.timestamp ?? Date.now(), - version: message.version, - annotations: message.annotations, - ...(flags["sequence-numbers"] - ? { sequencePrefix: `${formatIndex(this.sequenceCounter)} ` } - : {}), - }; - this.log(formatMessagesOutput([msgFields])); - this.log(""); // Empty line for readability between messages + // Stream mode: handle message.create/append accumulation + if (flags["token-streaming"] && message.serial) { + this.handleTokenStreamMessage(message, channel.name, flags); + return; } + + this.displayNormalMessage(message, channel.name, flags); }); subscribePromises.push(subscribePromise); } @@ -278,10 +238,215 @@ export default class ChannelsSubscribe extends AblyBaseCommand { // Wait until the user interrupts or the optional duration elapses await this.waitAndTrackCleanup(flags, "subscribe", flags.duration); + + // Finalize any in-progress stream before exit + if (flags["token-streaming"]) { + this.finalizeTokenStream(flags); + } } catch (error) { this.fail(error, flags, "channelSubscribe", { channels: channelNames, }); } } + + private finalizeTokenStream(flags: Record): void { + if (this.tokenStreamSerial === null) return; + + if (!this.shouldOutputJson(flags)) { + if (this.shouldUseTerminalUpdates()) { + // Using process.stdout.write to end the in-place line update started in handleTokenStreamMessage + process.stdout.write("\n"); + } + if (this.tokenStreamAppendCount > 0) { + this.log( + chalk.dim( + ` (${this.tokenStreamAppendCount} append${this.tokenStreamAppendCount === 1 ? "" : "s"})`, + ), + ); + } + this.log(""); + } + + this.tokenStreamSerial = null; + this.tokenStreamAccumulatedData = ""; + this.tokenStreamAppendCount = 0; + } + + private handleTokenStreamMessage( + message: Ably.Message, + channelName: string, + flags: Record, + ): void { + const action = + message.action === undefined ? undefined : String(message.action); + + // Only handle create/append in stream mode; everything else falls through + if (action !== "message.create" && action !== "message.append") { + // Non-streaming action: finalize any in-progress stream, then display normally + this.finalizeTokenStream(flags); + this.displayNormalMessage(message, channelName, flags); + return; + } + + const serial = message.serial!; + + // If serial changed, finalize the previous stream + if (this.tokenStreamSerial !== null && this.tokenStreamSerial !== serial) { + this.finalizeTokenStream(flags); + } + + const dataStr = message.data == null ? "" : String(message.data); + + const streamLabel = `${formatResource(channelName)} ${chalk.dim("[")}${formatEventType("token-stream")}${chalk.dim("]")}`; + + if (action === "message.create") { + this.tokenStreamSerial = serial; + this.tokenStreamAccumulatedData = dataStr; + this.tokenStreamAppendCount = 0; + + this.logCliEvent( + flags, + "subscribe", + "streamCreateReceived", + `Received stream create on channel ${channelName}`, + { action, serial, channel: channelName, data: dataStr }, + ); + + if (this.shouldOutputJson(flags)) { + this.logJsonEvent( + { + message: { + id: message.id, + action, + serial, + channel: channelName, + data: dataStr, + encoding: message.encoding, + ...(message.name ? { name: message.name } : {}), + clientId: message.clientId, + connectionId: message.connectionId, + timestamp: formatMessageTimestamp(message.timestamp), + }, + }, + flags, + ); + } else if (this.shouldUseTerminalUpdates()) { + // Using process.stdout.write instead of this.log() to avoid trailing newline, + // enabling in-place line updates via \r for streaming token display + process.stdout.write(`${streamLabel} ${dataStr}`); + } else { + this.log(`${streamLabel} ${dataStr}`); + } + } else { + // message.append + this.tokenStreamAccumulatedData += dataStr; + this.tokenStreamAppendCount++; + + this.logCliEvent( + flags, + "subscribe", + "streamAppendReceived", + `Received stream append on channel ${channelName}`, + { + action, + serial, + channel: channelName, + data: dataStr, + appendCount: this.tokenStreamAppendCount, + }, + ); + + if (this.shouldOutputJson(flags)) { + this.logJsonEvent( + { + message: { + id: message.id, + action, + serial, + channel: channelName, + data: dataStr, + encoding: message.encoding, + accumulatedData: this.tokenStreamAccumulatedData, + appendCount: this.tokenStreamAppendCount, + ...(message.name ? { name: message.name } : {}), + clientId: message.clientId, + connectionId: message.connectionId, + timestamp: formatMessageTimestamp(message.timestamp), + }, + }, + flags, + ); + } else if (this.shouldUseTerminalUpdates()) { + // Using process.stdout.write with \r to overwrite the current line in-place, + // showing accumulated token data without scrolling — this.log() can't do \r rewriting + process.stdout.write( + `\r${streamLabel} ${this.tokenStreamAccumulatedData}`, + ); + } else { + this.log(`${chalk.dim("+")} ${dataStr}`); + } + } + } + + private displayNormalMessage( + message: Ably.Message, + channelName: string, + flags: Record, + ): void { + const timestamp = formatMessageTimestamp(message.timestamp); + const messageData = { + id: message.id, + timestamp, + channel: channelName, + ...(message.name ? { name: message.name } : {}), + clientId: message.clientId, + connectionId: message.connectionId, + data: message.data, + encoding: message.encoding, + action: message.action === undefined ? undefined : String(message.action), + serial: message.serial, + version: message.version, + annotations: message.annotations, + ...(flags["sequence-numbers"] ? { sequence: this.sequenceCounter } : {}), + }; + this.logCliEvent( + flags, + "subscribe", + "messageReceived", + `Received message on channel ${channelName}`, + messageData, + ); + + if (this.shouldOutputJson(flags)) { + this.logJsonEvent( + { + message: messageData, + ...(flags["sequence-numbers"] + ? { sequence: this.sequenceCounter } + : {}), + }, + flags, + ); + } else { + const msgFields: MessageDisplayFields = { + action: + message.action === undefined ? undefined : String(message.action), + channel: channelName, + clientId: message.clientId, + data: message.data, + ...(message.name ? { name: message.name } : {}), + id: message.id, + serial: message.serial, + timestamp: message.timestamp ?? Date.now(), + version: message.version, + annotations: message.annotations, + ...(flags["sequence-numbers"] + ? { sequencePrefix: `${formatIndex(this.sequenceCounter)} ` } + : {}), + }; + this.log(formatMessagesOutput([msgFields])); + this.log(""); + } + } } diff --git a/src/utils/output.ts b/src/utils/output.ts index 7a213feb..0eaf7b0b 100644 --- a/src/utils/output.ts +++ b/src/utils/output.ts @@ -118,7 +118,7 @@ export interface MessageDisplayFields { channel: string; clientId?: string; data: unknown; - event: string; + name?: string; id?: string; indexPrefix?: string; sequencePrefix?: string; @@ -160,9 +160,12 @@ export function formatMessagesOutput(messages: MessageDisplayFields[]): string { lines.push( `${formatLabel("Timestamp")} ${formatMessageTimestamp(msg.timestamp)}`, `${formatLabel("Channel")} ${formatResource(msg.channel)}`, - `${formatLabel("Event")} ${formatEventType(msg.event)}`, ); + if (msg.name) { + lines.push(`${formatLabel("Name")} ${formatEventType(msg.name)}`); + } + if (msg.action) { lines.push(`${formatLabel("Action")} ${formatEventType(msg.action)}`); } diff --git a/test/unit/commands/channels/history.test.ts b/test/unit/commands/channels/history.test.ts index 672ff574..f194b3c2 100644 --- a/test/unit/commands/channels/history.test.ts +++ b/test/unit/commands/channels/history.test.ts @@ -92,7 +92,7 @@ describe("channels:history command", () => { import.meta.url, ); - expect(stdout).toContain("Event: test-event"); + expect(stdout).toContain("Name: test-event"); expect(stdout).toContain("Hello world"); expect(stdout).toContain("Client ID:"); expect(stdout).toContain("client-1"); diff --git a/test/unit/commands/channels/subscribe.test.ts b/test/unit/commands/channels/subscribe.test.ts index e66f2ab3..0d9323db 100644 --- a/test/unit/commands/channels/subscribe.test.ts +++ b/test/unit/commands/channels/subscribe.test.ts @@ -52,6 +52,7 @@ describe("channels:subscribe command", () => { "--delta", "--cipher-key", "--json", + "--token-streaming", ]); describe("functionality", () => { @@ -111,7 +112,7 @@ describe("channels:subscribe command", () => { // Should have received and displayed the message with channel, event, and data expect(stdout).toContain("test-channel"); - expect(stdout).toContain("Event: test-event"); + expect(stdout).toContain("Name: test-event"); expect(stdout).toContain("hello world"); expect(stdout).toContain("Timestamp:"); expect(stdout).toContain("Channel:"); @@ -183,7 +184,7 @@ describe("channels:subscribe command", () => { expect(record).toHaveProperty("type", "event"); expect(record).toHaveProperty("command", "channels:subscribe"); expect(record).toHaveProperty("message.channel", "test-channel"); - expect(record).toHaveProperty("message.event", "greeting"); + expect(record).toHaveProperty("message.name", "greeting"); expect(record).toHaveProperty("message.id", "msg-envelope-test"); expect(record).toHaveProperty("message.serial"); expect(record).toHaveProperty("message.version"); @@ -225,6 +226,362 @@ describe("channels:subscribe command", () => { }); }); + describe("token streaming mode", () => { + it("should display message.create data in token streaming mode", async () => { + const commandPromise = runCommand( + ["channels:subscribe", "test-channel", "--token-streaming"], + import.meta.url, + ); + + await vi.waitFor(() => { + expect(mockSubscribeCallback).not.toBeNull(); + }); + + mockSubscribeCallback!({ + action: "message.create", + serial: "serial-001", + name: "test-event", + data: "Hello", + timestamp: Date.now(), + id: "msg-stream-1", + }); + + const { stdout } = await commandPromise; + + expect(stdout).toContain("test-channel"); + expect(stdout).toContain("token-stream"); + expect(stdout).toContain("Hello"); + }); + + it("should stream message.append data for the same serial", async () => { + const commandPromise = runCommand( + ["channels:subscribe", "test-channel", "--token-streaming"], + import.meta.url, + ); + + await vi.waitFor(() => { + expect(mockSubscribeCallback).not.toBeNull(); + }); + + mockSubscribeCallback!({ + action: "message.create", + serial: "serial-001", + name: "test-event", + data: "Hello", + timestamp: Date.now(), + id: "msg-stream-1", + }); + + mockSubscribeCallback!({ + action: "message.append", + serial: "serial-001", + name: "test-event", + data: " world", + timestamp: Date.now(), + id: "msg-stream-2", + }); + + mockSubscribeCallback!({ + action: "message.append", + serial: "serial-001", + name: "test-event", + data: "!", + timestamp: Date.now(), + id: "msg-stream-3", + }); + + const { stdout } = await commandPromise; + + // In non-TTY mode, appends show as "+ chunk" lines + expect(stdout).toContain("Hello"); + expect(stdout).toContain(" world"); + expect(stdout).toContain("!"); + // Should show append count summary + expect(stdout).toContain("2 appends"); + }); + + it("should reset accumulation when serial changes", async () => { + const commandPromise = runCommand( + ["channels:subscribe", "test-channel", "--token-streaming"], + import.meta.url, + ); + + await vi.waitFor(() => { + expect(mockSubscribeCallback).not.toBeNull(); + }); + + mockSubscribeCallback!({ + action: "message.create", + serial: "serial-001", + name: "test-event", + data: "First", + timestamp: Date.now(), + id: "msg-1", + }); + + mockSubscribeCallback!({ + action: "message.create", + serial: "serial-002", + name: "test-event", + data: "Second", + timestamp: Date.now(), + id: "msg-2", + }); + + const { stdout } = await commandPromise; + + expect(stdout).toContain("First"); + expect(stdout).toContain("Second"); + }); + + it("should display messages without serial normally in token streaming mode", async () => { + const commandPromise = runCommand( + ["channels:subscribe", "test-channel", "--token-streaming"], + import.meta.url, + ); + + await vi.waitFor(() => { + expect(mockSubscribeCallback).not.toBeNull(); + }); + + mockSubscribeCallback!({ + name: "test-event", + data: "no-serial-message", + timestamp: Date.now(), + id: "msg-nosrial", + }); + + const { stdout } = await commandPromise; + + // Messages without serial should use normal display + expect(stdout).toContain("no-serial-message"); + expect(stdout).toContain("Name:"); + }); + + it("should accumulate chunks containing spaces correctly", async () => { + const records = await captureJsonLogs(async () => { + const commandPromise = runCommand( + ["channels:subscribe", "test-channel", "--token-streaming", "--json"], + import.meta.url, + ); + + await vi.waitFor(() => { + expect(mockSubscribeCallback).not.toBeNull(); + }); + + mockSubscribeCallback!({ + action: "message.create", + serial: "serial-spaces-001", + name: "test", + data: "The ", + timestamp: Date.now(), + id: "msg-sp-1", + }); + + mockSubscribeCallback!({ + action: "message.append", + serial: "serial-spaces-001", + name: "test", + data: "quick ", + timestamp: Date.now(), + id: "msg-sp-2", + }); + + mockSubscribeCallback!({ + action: "message.append", + serial: "serial-spaces-001", + name: "test", + data: "brown fox", + timestamp: Date.now(), + id: "msg-sp-3", + }); + + await commandPromise; + }); + + const appendEvents = records.filter( + (r) => + r.type === "event" && + (r as Record).message && + ((r as Record).message as Record) + .action === "message.append" && + ((r as Record).message as Record) + .serial === "serial-spaces-001", + ); + + // Last append should have the full accumulated text with spaces + const lastAppend = appendEvents.at(-1) as Record; + const msg = lastAppend.message as Record; + expect(msg.accumulatedData).toBe("The quick brown fox"); + }); + + it("should fall through to normal display for message.update during streaming mode", async () => { + const commandPromise = runCommand( + ["channels:subscribe", "test-channel", "--token-streaming"], + import.meta.url, + ); + + await vi.waitFor(() => { + expect(mockSubscribeCallback).not.toBeNull(); + }); + + // First a create, then an update (which should use normal display) + mockSubscribeCallback!({ + action: "message.create", + serial: "serial-upd-001", + name: "test-event", + data: "Initial", + timestamp: Date.now(), + id: "msg-upd-1", + }); + + mockSubscribeCallback!({ + action: "message.update", + serial: "serial-upd-001", + name: "test-event", + data: "Updated content", + timestamp: Date.now(), + id: "msg-upd-2", + }); + + const { stdout } = await commandPromise; + + // The update should fall through to normal display with Name: label + expect(stdout).toContain("Initial"); + expect(stdout).toContain("Updated content"); + expect(stdout).toContain("Name:"); + }); + + it("should fall through to normal display for message.delete during streaming mode", async () => { + const commandPromise = runCommand( + ["channels:subscribe", "test-channel", "--token-streaming"], + import.meta.url, + ); + + await vi.waitFor(() => { + expect(mockSubscribeCallback).not.toBeNull(); + }); + + mockSubscribeCallback!({ + action: "message.delete", + serial: "serial-del-001", + name: "test-event", + data: null, + timestamp: Date.now(), + id: "msg-del-1", + }); + + const { stdout } = await commandPromise; + + // Delete should use normal display + expect(stdout).toContain("Name:"); + }); + + it("should emit correct JSON shape for message.create events", async () => { + const records = await captureJsonLogs(async () => { + const commandPromise = runCommand( + ["channels:subscribe", "test-channel", "--token-streaming", "--json"], + import.meta.url, + ); + + await vi.waitFor(() => { + expect(mockSubscribeCallback).not.toBeNull(); + }); + + mockSubscribeCallback!({ + action: "message.create", + serial: "serial-create-json-001", + name: "greeting", + data: "hello", + timestamp: Date.now(), + id: "msg-create-json-1", + }); + + await commandPromise; + }); + + const createEvents = records.filter( + (r) => + r.type === "event" && + (r as Record).message && + ((r as Record).message as Record) + .action === "message.create" && + ((r as Record).message as Record) + .serial === "serial-create-json-001", + ); + + expect(createEvents.length).toBe(1); + const event = createEvents[0] as Record; + const msg = event.message as Record; + expect(msg).toHaveProperty("action", "message.create"); + expect(msg).toHaveProperty("serial", "serial-create-json-001"); + expect(msg).toHaveProperty("data", "hello"); + expect(msg).toHaveProperty("channel", "test-channel"); + expect(msg).toHaveProperty("name", "greeting"); + expect(msg).toHaveProperty("timestamp"); + }); + + it("should include action and serial in JSON output with --token-streaming", async () => { + const records = await captureJsonLogs(async () => { + const commandPromise = runCommand( + ["channels:subscribe", "test-channel", "--token-streaming", "--json"], + import.meta.url, + ); + + await vi.waitFor(() => { + expect(mockSubscribeCallback).not.toBeNull(); + }); + + mockSubscribeCallback!({ + action: "message.create", + serial: "serial-json-001", + name: "greeting", + data: "hi", + timestamp: Date.now(), + id: "msg-json-1", + }); + + mockSubscribeCallback!({ + action: "message.append", + serial: "serial-json-001", + name: "greeting", + data: " there", + timestamp: Date.now(), + id: "msg-json-2", + }); + + await commandPromise; + }); + + const streamEvents = records.filter( + (r) => + r.type === "event" && + (r as Record).message && + ((r as Record).message as Record) + .serial === "serial-json-001", + ); + expect(streamEvents.length).toBeGreaterThanOrEqual(2); + + const createEvent = streamEvents.find( + (r) => + ((r as Record).message as Record) + .action === "message.create", + ); + expect(createEvent).toBeDefined(); + expect(createEvent).toHaveProperty("message.data", "hi"); + + const appendEvent = streamEvents.find( + (r) => + ((r as Record).message as Record) + .action === "message.append", + ); + expect(appendEvent).toBeDefined(); + expect(appendEvent).toHaveProperty("message.data", " there"); + expect(appendEvent).toHaveProperty("message.accumulatedData", "hi there"); + expect(appendEvent).toHaveProperty("message.appendCount", 1); + }); + }); + describe("error handling", () => { it("should handle missing mock client in test mode", async () => { if (globalThis.__TEST_MOCKS__) { From 3d8642fa50e388462d772cdb5f7be7a0456c2cd2 Mon Sep 17 00:00:00 2001 From: umair Date: Thu, 19 Mar 2026 20:39:56 +0000 Subject: [PATCH 4/6] Update documentation and skills for token streaming Document error hint formatting conventions (single quotes, newline control) in AGENTS.md and skills. Update README with new flags and examples. Add text-chunker to Project-Structure.md. --- .claude/skills/ably-codebase-review/SKILL.md | 5 +++ .../ably-new-command/references/patterns.md | 17 +++++++ .claude/skills/ably-review/SKILL.md | 6 +++ AGENTS.md | 14 ++++++ README.md | 44 ++++++++++++------- docs/Project-Structure.md | 3 +- 6 files changed, 73 insertions(+), 16 deletions(-) diff --git a/.claude/skills/ably-codebase-review/SKILL.md b/.claude/skills/ably-codebase-review/SKILL.md index dbe59626..27bcf6f0 100644 --- a/.claude/skills/ably-codebase-review/SKILL.md +++ b/.claude/skills/ably-codebase-review/SKILL.md @@ -91,6 +91,11 @@ Launch these agents **in parallel**. Each agent gets a focused mandate and uses - `chalk.red("✗")` used as visual indicators (not error handling) is exempt - Component strings must be camelCase for consistency in verbose logs and JSON envelopes +**Error hints (`src/utils/errors.ts`):** +7. **Grep** for double-quoted CLI commands inside hint strings (e.g., `"ably login"`) — must use single quotes to avoid `\"` in JSON output +8. **Check** that long hints use `\n` for manual line breaks — oclif auto-wraps at awkward positions +9. **Verify** that `this.fail()` in base-command.ts strips `\n` from hints in JSON output (`.replaceAll("\n", " ")`) + ### Agent 3: Output Formatting Sweep **Goal:** Verify all human output uses the correct format helpers and is JSON-guarded. diff --git a/.claude/skills/ably-new-command/references/patterns.md b/.claude/skills/ably-new-command/references/patterns.md index b3be44e1..48dda43c 100644 --- a/.claude/skills/ably-new-command/references/patterns.md +++ b/.claude/skills/ably-new-command/references/patterns.md @@ -780,3 +780,20 @@ This emits two NDJSON lines in `--json` mode: | Collection result (list/get-all) | Plural noun | `keys`, `apps`, `rules`, `cursors` | The key name should match the SDK/domain terminology, not be generic. Use `message` not `data`, `cursor` not `item`. + +## Error Hints + +Add actionable hints for known Ably error codes in `src/utils/errors.ts`. `this.fail()` automatically looks up and appends these via `getFriendlyAblyErrorHint(code)`. + +**Formatting rules:** +- Use `\n` to control line wrapping in terminal output — oclif auto-wraps long lines at awkward positions, so manual `\n` gives precise control +- `\n` is automatically stripped (replaced with space) for `--json` / `--pretty-json` output, keeping hints as clean single-line strings +- Use single quotes for CLI command references (e.g., `'ably apps rules list'`) — double quotes become `\"` in JSON + +```typescript +// src/utils/errors.ts +const hints: Record = { + 93002: + "This channel requires mutableMessages to be enabled.\nRun 'ably apps rules list' to check your channel rules,\nor enable it with 'ably apps rules create' or 'ably apps rules update'.", +}; +``` diff --git a/.claude/skills/ably-review/SKILL.md b/.claude/skills/ably-review/SKILL.md index 6fd54073..9210d4d6 100644 --- a/.claude/skills/ably-review/SKILL.md +++ b/.claude/skills/ably-review/SKILL.md @@ -161,6 +161,12 @@ Apply the full checklist from the `ably-new-command` skill. These deserve the mo 2. **Grep** for new helper functions and check naming conventions (`format*` prefix for output helpers) 3. **Grep** for `this\.error\(` — should only be used inside `fail()`, not directly +### For changed error hints (`src/utils/errors.ts`) + +1. **Grep** for `"` (double quotes) inside hint strings — must use single quotes for CLI command references (double quotes become `\"` in JSON output) +2. **Check** that long hints use `\n` for line breaks — oclif auto-wraps at awkward positions, so `\n` gives control over terminal line wrapping +3. **Verify** that `this.fail()` in `src/base-command.ts` strips `\n` from hints in JSON output (`.replaceAll("\n", " ")`) + ## Step 4: Check for missing test coverage **Glob** for each new or modified command file and check if a corresponding test file exists at `test/unit/commands/`. If a command was added but no test was added, flag it. diff --git a/AGENTS.md b/AGENTS.md index 0766eb79..c559fd73 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -280,6 +280,20 @@ this.error() ← oclif exit (ONLY inside fail, nowhere else) - **`requireAppId`** returns `Promise` (not nullable) — calls `this.fail()` internally if no app found. - **`runControlCommand`** returns `Promise` (not nullable) — calls `this.fail()` internally on error. +### Error hints (`src/utils/errors.ts`) + +`getFriendlyAblyErrorHint(code)` maps Ably error codes to actionable CLI hints. `this.fail()` automatically looks up and appends these hints. + +**Line breaks in hints:** Use `\n` to control where hint text wraps in non-JSON terminal output — oclif auto-wraps long lines at awkward positions, so manual `\n` gives us control. The `\n` is automatically stripped (replaced with a space) for JSON output so the hint is a clean single-line string. Always use single quotes for CLI command references (e.g., `'ably apps rules list'`) — double quotes become `\"` in JSON output. + +```typescript +// In src/utils/errors.ts +const hints: Record = { + 93002: + "This channel requires mutableMessages to be enabled.\nRun 'ably apps rules list' to check your channel rules,\nor enable it with 'ably apps rules create' or 'ably apps rules update'.", +}; +``` + ### Additional output patterns (direct chalk, not helpers) - **No app error**: `'No app specified. Use --app flag or select an app with "ably apps switch"'` diff --git a/README.md b/README.md index 22f37a3c..1af969ed 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ $ npm install -g @ably/cli $ ably COMMAND running command... $ ably (--version) -@ably/cli/0.17.0 darwin-arm64 node-v25.3.0 +@ably/cli/0.17.0 darwin-arm64 node-v24.4.1 $ ably --help [COMMAND] USAGE $ ably COMMAND @@ -2048,25 +2048,30 @@ Publish a message to an Ably channel ``` USAGE - $ ably channels publish CHANNEL MESSAGE [-v] [--json | --pretty-json] [--client-id ] [-c ] [-d - ] [-e ] [-n ] [--transport rest|realtime] + $ ably channels publish CHANNEL MESSAGE [-v] [--json | --pretty-json] [--client-id ] [--token-size + --token-streaming] [-c ] [-d ] [-e ] [-n ] [--stream-duration ] [--transport + rest|realtime] ARGUMENTS CHANNEL The channel name to publish to MESSAGE The message to publish (JSON format or plain text) FLAGS - -c, --count= [default: 1] Number of messages to publish (default: 1) - -d, --delay= [default: 40] Delay between messages in milliseconds (default: 40ms, max 25 msgs/sec) - -e, --encoding= The encoding for the message - -n, --name= The event name (if not specified in the message JSON) - -v, --verbose Output verbose logs - --client-id= Overrides any default client ID when using API authentication. Use "none" to explicitly set - no client ID. Not applicable when using token authentication. - --json Output in JSON format - --pretty-json Output in colorized JSON format - --transport=