From 929370f3cbaecd8aff937fae114959df415562cc Mon Sep 17 00:00:00 2001 From: Tomas Zijdemans Date: Fri, 13 Mar 2026 12:55:04 +0100 Subject: [PATCH 1/5] feat(cache/unstable): add sliding expiration to `TtlCache` --- cache/ttl_cache.ts | 249 ++++++++++++++++++++++++++----- cache/ttl_cache_test.ts | 321 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 535 insertions(+), 35 deletions(-) diff --git a/cache/ttl_cache.ts b/cache/ttl_cache.ts index de3bceba144d..9d35af267cc5 100644 --- a/cache/ttl_cache.ts +++ b/cache/ttl_cache.ts @@ -3,6 +3,63 @@ import type { MemoizationCache } from "./memoize.ts"; +/** + * Options for {@linkcode TtlCache.prototype.set} when + * {@linkcode TtlCacheOptions.slidingExpiration | slidingExpiration} is + * disabled (the default). + * + * @experimental **UNSTABLE**: New API, yet to be vetted. + */ +export interface TtlCacheSetOptions { + /** + * A custom time-to-live in milliseconds for this entry. If supplied, + * overrides the cache's default TTL. Must be a finite, non-negative number. + */ + ttl?: number; +} + +/** + * Options for {@linkcode TtlCache.prototype.set} when + * {@linkcode TtlCacheOptions.slidingExpiration | slidingExpiration} is + * enabled. + * + * @experimental **UNSTABLE**: New API, yet to be vetted. + */ +export interface TtlCacheSlidingSetOptions extends TtlCacheSetOptions { + /** + * A maximum lifetime in milliseconds for this entry, measured from the + * time it is set. The sliding window cannot extend past this duration. + */ + absoluteExpiration?: number; +} + +/** + * Options for the {@linkcode TtlCache} constructor. + * + * @experimental **UNSTABLE**: New API, yet to be vetted. + * + * @typeParam Sliding Whether sliding expiration is enabled. + */ +export interface TtlCacheOptions { + /** + * Callback invoked when an entry is removed, whether by TTL expiry, + * manual deletion, or clearing the cache. + */ + onEject?: (ejectedKey: K, ejectedValue: V) => void; + /** + * When `true`, each {@linkcode TtlCache.prototype.get | get()} or + * {@linkcode TtlCache.prototype.has | has()} call resets the entry's TTL. + * + * If both `slidingExpiration` and + * {@linkcode TtlCacheSlidingSetOptions.absoluteExpiration | absoluteExpiration} + * are set on an entry, the sliding window cannot extend past the absolute + * expiration. + * + * @default {false} + */ + slidingExpiration?: Sliding; +} + /** * Time-to-live cache. * @@ -27,32 +84,34 @@ import type { MemoizationCache } from "./memoize.ts"; * assertEquals(cache.size, 0); * ``` * - * @example Adding a onEject function. + * @example Sliding expiration * ```ts * import { TtlCache } from "@std/cache/ttl-cache"; - * import { delay } from "@std/async/delay"; * import { assertEquals } from "@std/assert/equals"; + * import { FakeTime } from "@std/testing/time"; * - * const cache = new TtlCache(100, { onEject: (key, value) => { - * console.log("Revoking: ", key) - * URL.revokeObjectURL(value) - * }}) - * - * cache.set( - * "fast-url", - * URL.createObjectURL(new Blob(["Hello, World"], { type: "text/plain" })) - * ); + * using time = new FakeTime(0); + * const cache = new TtlCache(100, { + * slidingExpiration: true, + * }); * - * await delay(200) // "Revoking: fast-url" - * assertEquals(cache.get("fast-url"), undefined) + * cache.set("a", 1); + * time.now = 80; + * assertEquals(cache.get("a"), 1); // resets TTL + * time.now = 160; + * assertEquals(cache.get("a"), 1); // still alive, TTL was reset at t=80 + * time.now = 260; + * assertEquals(cache.get("a"), undefined); // expired * ``` */ -export class TtlCache extends Map +export class TtlCache extends Map implements MemoizationCache { #defaultTtl: number; #timeouts = new Map(); - - #eject: (ejectedKey: K, ejectedValue: V) => void; + #eject?: ((ejectedKey: K, ejectedValue: V) => void) | undefined; + #slidingExpiration: Sliding; + #entryTtls = new Map(); + #absoluteDeadlines = new Map(); /** * Constructs a new instance. @@ -60,17 +119,23 @@ export class TtlCache extends Map * @experimental **UNSTABLE**: New API, yet to be vetted. * * @param defaultTtl The default time-to-live in milliseconds. This value must - * be equal to or greater than 0. Its limit is determined by the current - * runtime's {@linkcode setTimeout} implementation. + * be a finite, non-negative number. Its upper limit is determined by the + * current runtime's {@linkcode setTimeout} implementation. * @param options Additional options. */ constructor( defaultTtl: number, - options?: { onEject: (ejectedKey: K, ejectedValue: V) => void }, + options?: TtlCacheOptions, ) { super(); + if (!(defaultTtl >= 0) || !Number.isFinite(defaultTtl)) { + throw new RangeError( + `Cannot create TtlCache: defaultTtl must be a finite, non-negative number: received ${defaultTtl}`, + ); + } this.#defaultTtl = defaultTtl; - this.#eject = options?.onEject ?? (() => {}); + this.#eject = options?.onEject; + this.#slidingExpiration = (options?.slidingExpiration ?? false) as Sliding; } /** @@ -78,12 +143,9 @@ export class TtlCache extends Map * * @experimental **UNSTABLE**: New API, yet to be vetted. * - * @param key The cache key - * @param value The value to set - * @param ttl A custom time-to-live. If supplied, overrides the cache's - * default TTL for this entry. This value must - * be equal to or greater than 0. Its limit is determined by the current - * runtime's {@linkcode setTimeout} implementation. + * @param key The cache key. + * @param value The value to set. + * @param options Options for this entry. * @returns `this` for chaining. * * @example Usage @@ -101,13 +163,100 @@ export class TtlCache extends Map * assertEquals(cache.get("a"), undefined); * ``` */ - override set(key: K, value: V, ttl: number = this.#defaultTtl): this { - clearTimeout(this.#timeouts.get(key)); + override set( + key: K, + value: V, + options?: Sliding extends true ? TtlCacheSlidingSetOptions + : TtlCacheSetOptions, + ): this { + const ttl = options?.ttl ?? this.#defaultTtl; + if (!(ttl >= 0) || !Number.isFinite(ttl)) { + throw new RangeError( + `Cannot set entry in TtlCache: ttl must be a finite, non-negative number: received ${ttl}`, + ); + } + + const existing = this.#timeouts.get(key); + if (existing !== undefined) clearTimeout(existing); super.set(key, value); this.#timeouts.set(key, setTimeout(() => this.delete(key), ttl)); + + if (this.#slidingExpiration) { + const slidingOptions = options as TtlCacheSlidingSetOptions | undefined; + this.#entryTtls.set(key, ttl); + if (slidingOptions?.absoluteExpiration !== undefined) { + const abs = slidingOptions.absoluteExpiration; + if (!(abs >= 0) || !Number.isFinite(abs)) { + throw new RangeError( + `Cannot set entry in TtlCache: absoluteExpiration must be a finite, non-negative number: received ${abs}`, + ); + } + this.#absoluteDeadlines.set(key, Date.now() + abs); + } else { + this.#absoluteDeadlines.delete(key); + } + } + return this; } + /** + * Gets the value associated with the specified key. + * + * @experimental **UNSTABLE**: New API, yet to be vetted. + * + * When {@linkcode TtlCacheOptions.slidingExpiration | slidingExpiration} is + * enabled, accessing an entry resets its TTL. + * + * @param key The key to get the value for. + * @returns The value associated with the specified key, or `undefined` if + * the key is not present in the cache. + * + * @example Usage + * ```ts + * import { TtlCache } from "@std/cache/ttl-cache"; + * import { assertEquals } from "@std/assert/equals"; + * + * const cache = new TtlCache(1000); + * + * cache.set("a", 1); + * assertEquals(cache.get("a"), 1); + * ``` + */ + override get(key: K): V | undefined { + if (!super.has(key)) return undefined; + if (this.#slidingExpiration) this.#resetTtl(key); + return super.get(key); + } + + /** + * Checks whether an element with the specified key exists. + * + * @experimental **UNSTABLE**: New API, yet to be vetted. + * + * When {@linkcode TtlCacheOptions.slidingExpiration | slidingExpiration} is + * enabled, checking an entry resets its TTL. + * + * @param key The key to check. + * @returns `true` if the cache contains the specified key, otherwise `false`. + * + * @example Usage + * ```ts + * import { TtlCache } from "@std/cache/ttl-cache"; + * import { assert } from "@std/assert"; + * + * const cache = new TtlCache(1000); + * + * cache.set("a", 1); + * assert(cache.has("a")); + * ``` + */ + override has(key: K): boolean { + const exists = super.has(key); + if (exists && this.#slidingExpiration) this.#resetTtl(key); + return exists; + } + /** * Deletes the value associated with the given key. * @@ -129,12 +278,17 @@ export class TtlCache extends Map * ``` */ override delete(key: K): boolean { - if (super.has(key)) { - this.#eject(key, super.get(key) as V); - } - clearTimeout(this.#timeouts.get(key)); + const value = super.get(key); + const existed = super.delete(key); + if (!existed) return false; + + const timeout = this.#timeouts.get(key); + if (timeout !== undefined) clearTimeout(timeout); this.#timeouts.delete(key); - return super.delete(key); + this.#entryTtls.delete(key); + this.#absoluteDeadlines.delete(key); + this.#eject?.(key, value!); + return true; } /** @@ -160,7 +314,19 @@ export class TtlCache extends Map clearTimeout(timeout); } this.#timeouts.clear(); + this.#entryTtls.clear(); + this.#absoluteDeadlines.clear(); + const entries = [...super.entries()]; super.clear(); + let error: unknown; + for (const [key, value] of entries) { + try { + this.#eject?.(key, value); + } catch (e) { + error ??= e; + } + } + if (error !== undefined) throw error; } /** @@ -186,4 +352,21 @@ export class TtlCache extends Map [Symbol.dispose](): void { this.clear(); } + + #resetTtl(key: K): void { + const ttl = this.#entryTtls.get(key); + if (ttl === undefined) return; + + const deadline = this.#absoluteDeadlines.get(key); + const effectiveTtl = deadline !== undefined + ? Math.min(ttl, Math.max(0, deadline - Date.now())) + : ttl; + + const existing = this.#timeouts.get(key); + if (existing !== undefined) clearTimeout(existing); + this.#timeouts.set( + key, + setTimeout(() => this.delete(key), effectiveTtl), + ); + } } diff --git a/cache/ttl_cache_test.ts b/cache/ttl_cache_test.ts index 9b868d9d2d21..8c87d890b25a 100644 --- a/cache/ttl_cache_test.ts +++ b/cache/ttl_cache_test.ts @@ -1,6 +1,6 @@ // Copyright 2018-2026 the Deno authors. MIT license. import { TtlCache } from "./ttl_cache.ts"; -import { assertEquals } from "@std/assert"; +import { assertEquals, assertThrows } from "@std/assert"; import { FakeTime } from "@std/testing/time"; const UNSET = Symbol("UNSET"); @@ -68,7 +68,7 @@ Deno.test("TtlCache deletes entries", async (t) => { const cache = new TtlCache(10); cache.set(1, "one"); - cache.set(2, "two", 3); + cache.set(2, "two", { ttl: 3 }); time.now = 1; assertEntries(cache, [[1, "one"], [2, "two"]]); @@ -143,4 +143,321 @@ Deno.test("TtlCache onEject()", async (t) => { assertEquals(ejected, [[1, 0], [2, ""], [3, false], [4, null]]); }); + + await t.step("calls onEject on clear()", () => { + const ejected: [number, string][] = []; + using cache = new TtlCache(1000, { + onEject: (k, v) => ejected.push([k, v]), + }); + + cache.set(1, "one"); + cache.set(2, "two"); + cache.set(3, "three"); + cache.clear(); + + assertEquals(ejected, [[1, "one"], [2, "two"], [3, "three"]]); + }); + + await t.step("calls onEject on [Symbol.dispose]()", () => { + const ejected: [number, string][] = []; + { + using cache = new TtlCache(1000, { + onEject: (k, v) => ejected.push([k, v]), + }); + cache.set(1, "one"); + cache.set(2, "two"); + } + + assertEquals(ejected, [[1, "one"], [2, "two"]]); + }); + + await t.step("does not call onEject when overwriting a key", () => { + const ejected: [string, number][] = []; + using cache = new TtlCache(1000, { + onEject: (k, v) => ejected.push([k, v]), + }); + + cache.set("a", 1); + cache.set("a", 2); + + assertEquals(ejected, []); + assertEquals(cache.get("a"), 2); + }); + + await t.step("entry is fully removed before onEject fires", () => { + let sizeInCallback = -1; + let hasInCallback = true; + using cache = new TtlCache(1000, { + onEject: (k) => { + sizeInCallback = cache.size; + hasInCallback = cache.has(k); + }, + }); + + cache.set("a", 1); + cache.delete("a"); + + assertEquals(sizeInCallback, 0); + assertEquals(hasInCallback, false); + }); +}); + +Deno.test("TtlCache validates TTL", async (t) => { + await t.step("constructor rejects negative defaultTtl", () => { + assertThrows( + () => new TtlCache(-1), + RangeError, + "defaultTtl must be a finite, non-negative number", + ); + }); + + await t.step("constructor rejects NaN defaultTtl", () => { + assertThrows( + () => new TtlCache(NaN), + RangeError, + "defaultTtl must be a finite, non-negative number", + ); + }); + + await t.step("constructor rejects Infinity defaultTtl", () => { + assertThrows( + () => new TtlCache(Infinity), + RangeError, + "defaultTtl must be a finite, non-negative number", + ); + }); + + await t.step("constructor accepts 0", () => { + using _cache = new TtlCache(0); + }); + + await t.step("set() rejects negative ttl", () => { + using cache = new TtlCache(1000); + assertThrows( + () => cache.set("a", 1, { ttl: -1 }), + RangeError, + "ttl must be a finite, non-negative number", + ); + }); + + await t.step("set() rejects NaN ttl", () => { + using cache = new TtlCache(1000); + assertThrows( + () => cache.set("a", 1, { ttl: NaN }), + RangeError, + "ttl must be a finite, non-negative number", + ); + }); + + await t.step("set() rejects Infinity ttl", () => { + using cache = new TtlCache(1000); + assertThrows( + () => cache.set("a", 1, { ttl: Infinity }), + RangeError, + "ttl must be a finite, non-negative number", + ); + }); + + await t.step("set() accepts 0 ttl", () => { + using cache = new TtlCache(1000); + cache.set("a", 1, { ttl: 0 }); + assertEquals(cache.get("a"), 1); + }); + + await t.step("set() rejects negative absoluteExpiration", () => { + using cache = new TtlCache(1000, { + slidingExpiration: true, + }); + assertThrows( + () => cache.set("a", 1, { absoluteExpiration: -1 }), + RangeError, + "absoluteExpiration must be a finite, non-negative number", + ); + }); + + await t.step("set() rejects NaN absoluteExpiration", () => { + using cache = new TtlCache(1000, { + slidingExpiration: true, + }); + assertThrows( + () => cache.set("a", 1, { absoluteExpiration: NaN }), + RangeError, + "absoluteExpiration must be a finite, non-negative number", + ); + }); +}); + +Deno.test("TtlCache clear() calls all onEject callbacks even if one throws", () => { + const ejected: string[] = []; + using cache = new TtlCache(1000, { + onEject: (k) => { + ejected.push(k); + if (k === "a") throw new Error("boom"); + }, + }); + + cache.set("a", 1); + cache.set("b", 2); + cache.set("c", 3); + assertThrows(() => cache.clear(), Error, "boom"); + assertEquals(ejected, ["a", "b", "c"]); + assertEquals(cache.size, 0); +}); + +Deno.test("TtlCache get() returns undefined for missing key with sliding expiration", () => { + using cache = new TtlCache(100, { + slidingExpiration: true, + }); + assertEquals(cache.get("missing"), undefined); +}); + +Deno.test("TtlCache sliding expiration", async (t) => { + await t.step("get() resets TTL", () => { + using time = new FakeTime(0); + const cache = new TtlCache(100, { + slidingExpiration: true, + }); + + cache.set("a", 1); + + time.now = 80; + assertEquals(cache.get("a"), 1); + + // TTL was reset at t=80, so entry lives until t=180 + time.now = 160; + assertEquals(cache.get("a"), 1); + + // TTL was reset at t=160, so entry lives until t=260 + time.now = 250; + assertEquals(cache.get("a"), 1); + + time.now = 350; + assertEquals(cache.get("a"), undefined); + }); + + await t.step("has() resets TTL", () => { + using time = new FakeTime(0); + const cache = new TtlCache(100, { + slidingExpiration: true, + }); + + cache.set("a", 1); + + time.now = 80; + assertEquals(cache.has("a"), true); + + time.now = 160; + assertEquals(cache.has("a"), true); + + time.now = 260; + assertEquals(cache.has("a"), false); + }); + + await t.step("does not reset TTL when slidingExpiration is false", () => { + using time = new FakeTime(0); + const cache = new TtlCache(100); + + cache.set("a", 1); + + time.now = 80; + assertEquals(cache.get("a"), 1); + + time.now = 100; + assertEquals(cache.get("a"), undefined); + }); + + await t.step("absoluteExpiration caps sliding extension", () => { + using time = new FakeTime(0); + const cache = new TtlCache(100, { + slidingExpiration: true, + }); + + cache.set("a", 1, { absoluteExpiration: 150 }); + + time.now = 80; + assertEquals(cache.get("a"), 1); + + time.now = 140; + assertEquals(cache.get("a"), 1); + + // Absolute deadline is t=150; sliding cannot extend past it + time.now = 150; + assertEquals(cache.get("a"), undefined); + }); + + await t.step( + "absoluteExpiration is a type error without slidingExpiration", + () => { + using time = new FakeTime(0); + const cache = new TtlCache(100); + + // @ts-expect-error absoluteExpiration requires slidingExpiration: true + cache.set("a", 1, { absoluteExpiration: 50 }); + + time.now = 80; + assertEquals(cache.get("a"), 1); + + time.now = 100; + assertEquals(cache.get("a"), undefined); + }, + ); + + await t.step("per-entry TTL works with sliding expiration", () => { + using time = new FakeTime(0); + const cache = new TtlCache(100, { + slidingExpiration: true, + }); + + cache.set("a", 1, { ttl: 50 }); + + time.now = 40; + assertEquals(cache.get("a"), 1); + + // TTL reset to 50ms at t=40, so alive until t=90 + time.now = 80; + assertEquals(cache.get("a"), 1); + + // TTL reset to 50ms at t=80, so alive until t=130 + time.now = 130; + assertEquals(cache.get("a"), undefined); + }); + + await t.step("sliding expiration calls onEject on expiry", () => { + using time = new FakeTime(0); + const ejected: [string, number][] = []; + const cache = new TtlCache(100, { + slidingExpiration: true, + onEject: (k, v) => ejected.push([k, v]), + }); + + cache.set("a", 1); + + time.now = 80; + cache.get("a"); + + time.now = 180; + assertEquals(ejected, [["a", 1]]); + }); + + await t.step("overwriting entry resets sliding metadata", () => { + using time = new FakeTime(0); + const cache = new TtlCache(100, { + slidingExpiration: true, + }); + + cache.set("a", 1, { ttl: 50, absoluteExpiration: 200 }); + + time.now = 40; + cache.get("a"); + + // Overwrite with different TTL and no absoluteExpiration + cache.set("a", 2, { ttl: 30 }); + + time.now = 60; + assertEquals(cache.get("a"), 2); + + // TTL reset to 30ms at t=60, alive until t=90 + time.now = 90; + assertEquals(cache.get("a"), undefined); + }); }); From 93ca72641a413936cb568feb02b0c34ebdec3af8 Mon Sep 17 00:00:00 2001 From: Tomas Zijdemans Date: Fri, 13 Mar 2026 13:16:55 +0100 Subject: [PATCH 2/5] fix doc --- cache/ttl_cache.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cache/ttl_cache.ts b/cache/ttl_cache.ts index 9d35af267cc5..19bf7c77c15a 100644 --- a/cache/ttl_cache.ts +++ b/cache/ttl_cache.ts @@ -217,7 +217,7 @@ export class TtlCache extends Map * import { TtlCache } from "@std/cache/ttl-cache"; * import { assertEquals } from "@std/assert/equals"; * - * const cache = new TtlCache(1000); + * using cache = new TtlCache(1000); * * cache.set("a", 1); * assertEquals(cache.get("a"), 1); @@ -245,7 +245,7 @@ export class TtlCache extends Map * import { TtlCache } from "@std/cache/ttl-cache"; * import { assert } from "@std/assert"; * - * const cache = new TtlCache(1000); + * using cache = new TtlCache(1000); * * cache.set("a", 1); * assert(cache.has("a")); From e8757e8f21e21897534f131246c7e51c2141348d Mon Sep 17 00:00:00 2001 From: Tomas Zijdemans Date: Fri, 13 Mar 2026 13:51:15 +0100 Subject: [PATCH 3/5] fix doc --- cache/ttl_cache.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/cache/ttl_cache.ts b/cache/ttl_cache.ts index 19bf7c77c15a..58dcc70cdbc8 100644 --- a/cache/ttl_cache.ts +++ b/cache/ttl_cache.ts @@ -69,6 +69,7 @@ export interface TtlCacheOptions { * * @typeParam K The type of the cache keys. * @typeParam V The type of the cache values. + * @typeParam Sliding Whether sliding expiration is enabled. * * @example Usage * ```ts From 01653d9b926f224b03accbe2942fbc74c6b618a4 Mon Sep 17 00:00:00 2001 From: Tomas Zijdemans Date: Sun, 15 Mar 2026 20:06:06 +0100 Subject: [PATCH 4/5] =?UTF-8?q?feat(http):=20stabilize=20=C2=B4ServerSentE?= =?UTF-8?q?ventParseStream=C2=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- http/deno.json | 2 +- ...m.ts => server_sent_event_parse_stream.ts} | 28 ++----------------- ...=> server_sent_event_parse_stream_test.ts} | 13 ++------- 3 files changed, 7 insertions(+), 36 deletions(-) rename http/{unstable_server_sent_event_stream.ts => server_sent_event_parse_stream.ts} (82%) rename http/{unstable_server_sent_event_stream_test.ts => server_sent_event_parse_stream_test.ts} (94%) diff --git a/http/deno.json b/http/deno.json index edd7e31e079a..6adc9772d09d 100644 --- a/http/deno.json +++ b/http/deno.json @@ -13,7 +13,7 @@ "./unstable-method": "./unstable_method.ts", "./negotiation": "./negotiation.ts", "./server-sent-event-stream": "./server_sent_event_stream.ts", - "./unstable-server-sent-event-stream": "./unstable_server_sent_event_stream.ts", + "./server-sent-event-parse-stream": "./server_sent_event_parse_stream.ts", "./status": "./status.ts", "./unstable-signed-cookie": "./unstable_signed_cookie.ts", "./unstable-structured-fields": "./unstable_structured_fields.ts", diff --git a/http/unstable_server_sent_event_stream.ts b/http/server_sent_event_parse_stream.ts similarity index 82% rename from http/unstable_server_sent_event_stream.ts rename to http/server_sent_event_parse_stream.ts index 74dad082368f..78d57e59048a 100644 --- a/http/unstable_server_sent_event_stream.ts +++ b/http/server_sent_event_parse_stream.ts @@ -9,8 +9,6 @@ import type { ServerSentEventMessage } from "./server_sent_event_stream.ts"; * Unlike {@linkcode ServerSentEventMessage}, the `id` field is always * a string (not `string | number`) because parsed IDs are not coerced. * - * @experimental **UNSTABLE**: New API, yet to be vetted. - * * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#fields} */ export type ServerSentEventParsedMessage = @@ -22,8 +20,6 @@ export type ServerSentEventParsedMessage = /** * Options for {@linkcode ServerSentEventParseStream}. - * - * @experimental **UNSTABLE**: New API, yet to be vetted. */ export interface ServerSentEventParseStreamOptions { /** @@ -49,7 +45,6 @@ function parseLine( message: ServerSentEventParsedMessage, ignoreComments: boolean, ): boolean { - // Lines starting with colon are comments if (line[0] === ":") { if (ignoreComments) return false; const value = line.slice(1); @@ -59,18 +54,15 @@ function parseLine( return true; } - // Parse field:value const colonIndex = line.indexOf(":"); let field: string; let value: string; if (colonIndex === -1) { - // No colon means field name only, empty value field = line; value = ""; } else { field = line.slice(0, colonIndex); - // Remove single leading space from value if present value = line[colonIndex + 1] === " " ? line.slice(colonIndex + 2) : line.slice(colonIndex + 1); @@ -81,27 +73,23 @@ function parseLine( message.event = value; return true; case "data": - // Accumulate data with newlines between message.data = message.data !== undefined ? `${message.data}\n${value}` : value; return true; case "id": - // Per spec: ignore if value contains null character if (!value.includes("\0")) { message.id = value; return true; } return false; case "retry": - // Per spec: only set if value consists of ASCII digits only if (/^\d+$/.test(value)) { message.retry = parseInt(value, 10); return true; } return false; default: - // Unknown fields are ignored per spec return false; } } @@ -115,11 +103,9 @@ function parseLine( * * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events} * - * @experimental **UNSTABLE**: New API, yet to be vetted. - * * @example Basic usage with fetch * ```ts ignore - * import { ServerSentEventParseStream } from "@std/http/unstable-server-sent-event-stream"; + * import { ServerSentEventParseStream } from "@std/http/server-sent-event-parse-stream"; * * const response = await fetch("https://example.com/sse", { * headers: { "Authorization": "Bearer token" }, @@ -135,7 +121,7 @@ function parseLine( * * @example Roundtrip with ServerSentEventStream * ```ts - * import { ServerSentEventParseStream } from "@std/http/unstable-server-sent-event-stream"; + * import { ServerSentEventParseStream } from "@std/http/server-sent-event-parse-stream"; * import { ServerSentEventStream } from "@std/http/server-sent-event-stream"; * import { assertEquals } from "@std/assert"; * @@ -156,7 +142,7 @@ function parseLine( * * @example Ignoring comments * ```ts - * import { ServerSentEventParseStream } from "@std/http/unstable-server-sent-event-stream"; + * import { ServerSentEventParseStream } from "@std/http/server-sent-event-parse-stream"; * import { assertEquals } from "@std/assert"; * * const stream = ReadableStream.from([ @@ -189,21 +175,17 @@ export class ServerSentEventParseStream transform(chunk, controller) { buffer += decoder.decode(chunk, { stream: true }); - // Preserve trailing \r - it might be part of \r\n split across chunks let trailingCR = ""; if (buffer[buffer.length - 1] === "\r") { trailingCR = "\r"; buffer = buffer.slice(0, -1); } - // Process complete lines const lines = buffer.split(NEWLINE_REGEXP); - // Keep incomplete last line in buffer, restore any trailing \r buffer = lines.pop()! + trailingCR; for (const line of lines) { if (line === "") { - // Empty line signals end of message - dispatch if non-empty if (hasFields) { controller.enqueue(message); } @@ -216,16 +198,13 @@ export class ServerSentEventParseStream }, flush(controller) { - // Handle any remaining content in buffer buffer += decoder.decode(); - // Trailing \r at end of stream is a line ending if (buffer[buffer.length - 1] === "\r") { buffer = buffer.slice(0, -1); if (parseLine(buffer, message, ignoreComments)) { hasFields = true; } - // The \r was a line ending, so dispatch if we have fields if (hasFields) { controller.enqueue(message); return; @@ -236,7 +215,6 @@ export class ServerSentEventParseStream } } - // Dispatch final message if non-empty if (hasFields) { controller.enqueue(message); } diff --git a/http/unstable_server_sent_event_stream_test.ts b/http/server_sent_event_parse_stream_test.ts similarity index 94% rename from http/unstable_server_sent_event_stream_test.ts rename to http/server_sent_event_parse_stream_test.ts index f66f7db80e84..78e68991ce2a 100644 --- a/http/unstable_server_sent_event_stream_test.ts +++ b/http/server_sent_event_parse_stream_test.ts @@ -3,7 +3,7 @@ import { assertEquals } from "@std/assert"; import { type ServerSentEventParsedMessage, ServerSentEventParseStream, -} from "./unstable_server_sent_event_stream.ts"; +} from "./server_sent_event_parse_stream.ts"; import { type ServerSentEventMessage, ServerSentEventStream, @@ -167,7 +167,6 @@ Deno.test("ServerSentEventParseStream handles mixed line endings", async () => { }); Deno.test("ServerSentEventParseStream handles CRLF split across chunks", async () => { - // \r at end of chunk 1, \n at start of chunk 2 should be ONE line ending const result = await Array.fromAsync(parseStream([ ":comment\r", "\ndata: hello\n\n", @@ -177,7 +176,6 @@ Deno.test("ServerSentEventParseStream handles CRLF split across chunks", async ( }); Deno.test("ServerSentEventParseStream handles standalone CR at chunk boundary", async () => { - // \r at end of chunk 1 followed by non-\n should be a line ending const result = await Array.fromAsync(parseStream([ "data: one\r", "data: two\n\n", @@ -235,7 +233,6 @@ Deno.test("ServerSentEventParseStream handles empty data value", async () => { }); Deno.test("ServerSentEventParseStream handles data with leading space preserved", async () => { - // Only ONE space after colon is removed const result = await Array.fromAsync(parseStream([ "data: two spaces\n\n", ])); @@ -244,7 +241,6 @@ Deno.test("ServerSentEventParseStream handles data with leading space preserved" }); Deno.test("ServerSentEventParseStream handles flush with pending message", async () => { - // No trailing newlines - message should be dispatched on flush const result = await Array.fromAsync(parseStream([ "data: final", ])); @@ -305,12 +301,9 @@ Deno.test("ServerSentEventParseStream handles stream with only whitespace", asyn }); Deno.test("ServerSentEventParseStream handles multi-byte UTF-8 split across chunks", async () => { - // "data: 🦕\n\n" - the dinosaur emoji is 4 bytes (F0 9F A6 95) - // Split it in the middle of the emoji const full = new TextEncoder().encode("data: 🦕\n\n"); - // Split after "data: " and first 2 bytes of emoji - const chunk1 = full.slice(0, 8); // "data: " (6) + first 2 bytes of emoji - const chunk2 = full.slice(8); // last 2 bytes of emoji + "\n\n" + const chunk1 = full.slice(0, 8); + const chunk2 = full.slice(8); const stream = ReadableStream.from([chunk1, chunk2]) .pipeThrough(new ServerSentEventParseStream()); From 46dea6c4e4073926e093e6fd4e29364be34f120f Mon Sep 17 00:00:00 2001 From: Tomas Zijdemans Date: Sun, 15 Mar 2026 20:09:08 +0100 Subject: [PATCH 5/5] =?UTF-8?q?Revert=20"feat(http):=20stabilize=20=C2=B4S?= =?UTF-8?q?erverSentEventParseStream=C2=B4"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 01653d9b926f224b03accbe2942fbc74c6b618a4. --- http/deno.json | 2 +- ...s => unstable_server_sent_event_stream.ts} | 28 +++++++++++++++++-- ...unstable_server_sent_event_stream_test.ts} | 13 +++++++-- 3 files changed, 36 insertions(+), 7 deletions(-) rename http/{server_sent_event_parse_stream.ts => unstable_server_sent_event_stream.ts} (82%) rename http/{server_sent_event_parse_stream_test.ts => unstable_server_sent_event_stream_test.ts} (94%) diff --git a/http/deno.json b/http/deno.json index 6adc9772d09d..edd7e31e079a 100644 --- a/http/deno.json +++ b/http/deno.json @@ -13,7 +13,7 @@ "./unstable-method": "./unstable_method.ts", "./negotiation": "./negotiation.ts", "./server-sent-event-stream": "./server_sent_event_stream.ts", - "./server-sent-event-parse-stream": "./server_sent_event_parse_stream.ts", + "./unstable-server-sent-event-stream": "./unstable_server_sent_event_stream.ts", "./status": "./status.ts", "./unstable-signed-cookie": "./unstable_signed_cookie.ts", "./unstable-structured-fields": "./unstable_structured_fields.ts", diff --git a/http/server_sent_event_parse_stream.ts b/http/unstable_server_sent_event_stream.ts similarity index 82% rename from http/server_sent_event_parse_stream.ts rename to http/unstable_server_sent_event_stream.ts index 78d57e59048a..74dad082368f 100644 --- a/http/server_sent_event_parse_stream.ts +++ b/http/unstable_server_sent_event_stream.ts @@ -9,6 +9,8 @@ import type { ServerSentEventMessage } from "./server_sent_event_stream.ts"; * Unlike {@linkcode ServerSentEventMessage}, the `id` field is always * a string (not `string | number`) because parsed IDs are not coerced. * + * @experimental **UNSTABLE**: New API, yet to be vetted. + * * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#fields} */ export type ServerSentEventParsedMessage = @@ -20,6 +22,8 @@ export type ServerSentEventParsedMessage = /** * Options for {@linkcode ServerSentEventParseStream}. + * + * @experimental **UNSTABLE**: New API, yet to be vetted. */ export interface ServerSentEventParseStreamOptions { /** @@ -45,6 +49,7 @@ function parseLine( message: ServerSentEventParsedMessage, ignoreComments: boolean, ): boolean { + // Lines starting with colon are comments if (line[0] === ":") { if (ignoreComments) return false; const value = line.slice(1); @@ -54,15 +59,18 @@ function parseLine( return true; } + // Parse field:value const colonIndex = line.indexOf(":"); let field: string; let value: string; if (colonIndex === -1) { + // No colon means field name only, empty value field = line; value = ""; } else { field = line.slice(0, colonIndex); + // Remove single leading space from value if present value = line[colonIndex + 1] === " " ? line.slice(colonIndex + 2) : line.slice(colonIndex + 1); @@ -73,23 +81,27 @@ function parseLine( message.event = value; return true; case "data": + // Accumulate data with newlines between message.data = message.data !== undefined ? `${message.data}\n${value}` : value; return true; case "id": + // Per spec: ignore if value contains null character if (!value.includes("\0")) { message.id = value; return true; } return false; case "retry": + // Per spec: only set if value consists of ASCII digits only if (/^\d+$/.test(value)) { message.retry = parseInt(value, 10); return true; } return false; default: + // Unknown fields are ignored per spec return false; } } @@ -103,9 +115,11 @@ function parseLine( * * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events} * + * @experimental **UNSTABLE**: New API, yet to be vetted. + * * @example Basic usage with fetch * ```ts ignore - * import { ServerSentEventParseStream } from "@std/http/server-sent-event-parse-stream"; + * import { ServerSentEventParseStream } from "@std/http/unstable-server-sent-event-stream"; * * const response = await fetch("https://example.com/sse", { * headers: { "Authorization": "Bearer token" }, @@ -121,7 +135,7 @@ function parseLine( * * @example Roundtrip with ServerSentEventStream * ```ts - * import { ServerSentEventParseStream } from "@std/http/server-sent-event-parse-stream"; + * import { ServerSentEventParseStream } from "@std/http/unstable-server-sent-event-stream"; * import { ServerSentEventStream } from "@std/http/server-sent-event-stream"; * import { assertEquals } from "@std/assert"; * @@ -142,7 +156,7 @@ function parseLine( * * @example Ignoring comments * ```ts - * import { ServerSentEventParseStream } from "@std/http/server-sent-event-parse-stream"; + * import { ServerSentEventParseStream } from "@std/http/unstable-server-sent-event-stream"; * import { assertEquals } from "@std/assert"; * * const stream = ReadableStream.from([ @@ -175,17 +189,21 @@ export class ServerSentEventParseStream transform(chunk, controller) { buffer += decoder.decode(chunk, { stream: true }); + // Preserve trailing \r - it might be part of \r\n split across chunks let trailingCR = ""; if (buffer[buffer.length - 1] === "\r") { trailingCR = "\r"; buffer = buffer.slice(0, -1); } + // Process complete lines const lines = buffer.split(NEWLINE_REGEXP); + // Keep incomplete last line in buffer, restore any trailing \r buffer = lines.pop()! + trailingCR; for (const line of lines) { if (line === "") { + // Empty line signals end of message - dispatch if non-empty if (hasFields) { controller.enqueue(message); } @@ -198,13 +216,16 @@ export class ServerSentEventParseStream }, flush(controller) { + // Handle any remaining content in buffer buffer += decoder.decode(); + // Trailing \r at end of stream is a line ending if (buffer[buffer.length - 1] === "\r") { buffer = buffer.slice(0, -1); if (parseLine(buffer, message, ignoreComments)) { hasFields = true; } + // The \r was a line ending, so dispatch if we have fields if (hasFields) { controller.enqueue(message); return; @@ -215,6 +236,7 @@ export class ServerSentEventParseStream } } + // Dispatch final message if non-empty if (hasFields) { controller.enqueue(message); } diff --git a/http/server_sent_event_parse_stream_test.ts b/http/unstable_server_sent_event_stream_test.ts similarity index 94% rename from http/server_sent_event_parse_stream_test.ts rename to http/unstable_server_sent_event_stream_test.ts index 78e68991ce2a..f66f7db80e84 100644 --- a/http/server_sent_event_parse_stream_test.ts +++ b/http/unstable_server_sent_event_stream_test.ts @@ -3,7 +3,7 @@ import { assertEquals } from "@std/assert"; import { type ServerSentEventParsedMessage, ServerSentEventParseStream, -} from "./server_sent_event_parse_stream.ts"; +} from "./unstable_server_sent_event_stream.ts"; import { type ServerSentEventMessage, ServerSentEventStream, @@ -167,6 +167,7 @@ Deno.test("ServerSentEventParseStream handles mixed line endings", async () => { }); Deno.test("ServerSentEventParseStream handles CRLF split across chunks", async () => { + // \r at end of chunk 1, \n at start of chunk 2 should be ONE line ending const result = await Array.fromAsync(parseStream([ ":comment\r", "\ndata: hello\n\n", @@ -176,6 +177,7 @@ Deno.test("ServerSentEventParseStream handles CRLF split across chunks", async ( }); Deno.test("ServerSentEventParseStream handles standalone CR at chunk boundary", async () => { + // \r at end of chunk 1 followed by non-\n should be a line ending const result = await Array.fromAsync(parseStream([ "data: one\r", "data: two\n\n", @@ -233,6 +235,7 @@ Deno.test("ServerSentEventParseStream handles empty data value", async () => { }); Deno.test("ServerSentEventParseStream handles data with leading space preserved", async () => { + // Only ONE space after colon is removed const result = await Array.fromAsync(parseStream([ "data: two spaces\n\n", ])); @@ -241,6 +244,7 @@ Deno.test("ServerSentEventParseStream handles data with leading space preserved" }); Deno.test("ServerSentEventParseStream handles flush with pending message", async () => { + // No trailing newlines - message should be dispatched on flush const result = await Array.fromAsync(parseStream([ "data: final", ])); @@ -301,9 +305,12 @@ Deno.test("ServerSentEventParseStream handles stream with only whitespace", asyn }); Deno.test("ServerSentEventParseStream handles multi-byte UTF-8 split across chunks", async () => { + // "data: 🦕\n\n" - the dinosaur emoji is 4 bytes (F0 9F A6 95) + // Split it in the middle of the emoji const full = new TextEncoder().encode("data: 🦕\n\n"); - const chunk1 = full.slice(0, 8); - const chunk2 = full.slice(8); + // Split after "data: " and first 2 bytes of emoji + const chunk1 = full.slice(0, 8); // "data: " (6) + first 2 bytes of emoji + const chunk2 = full.slice(8); // last 2 bytes of emoji + "\n\n" const stream = ReadableStream.from([chunk1, chunk2]) .pipeThrough(new ServerSentEventParseStream());