diff --git a/script/node-polyfills.ts b/script/node-polyfills.ts index 12fb360dc..170b829c4 100644 --- a/script/node-polyfills.ts +++ b/script/node-polyfills.ts @@ -8,6 +8,10 @@ import { } from "node:child_process"; import { statSync } from "node:fs"; import { access, readFile, stat, writeFile } from "node:fs/promises"; +import { promisify } from "node:util"; +// biome-ignore lint/performance/noNamespaceImport: runtime access to optional `zstdCompress`/`zstdDecompress` exports +import * as zlibModule from "node:zlib"; +import { constants as zlibConstants } from "node:zlib"; // node:sqlite is imported lazily inside NodeDatabasePolyfill to avoid // crashing on Node.js versions without node:sqlite support when the // bundle is loaded as a library (the consumer may never use SQLite). @@ -284,4 +288,66 @@ const BunPolyfill = { }, }; +/** + * Coerce arbitrary compression input (string, ArrayBuffer, TypedArray) + * into a contiguous Buffer without copying where possible. + */ +function toBufferForCompression( + data: NodeJS.TypedArray | Buffer | string | ArrayBuffer +): Buffer { + if (typeof data === "string") { + return Buffer.from(data, "utf-8"); + } + if (data instanceof ArrayBuffer) { + return Buffer.from(data); + } + return Buffer.from(data.buffer, data.byteOffset, data.byteLength); +} + +// Feature-detected zstd polyfill. `node:zlib.zstdCompress` lands in Node +// 22.15; we do NOT install the polyfill below on older Node because +// Bun.zstdCompress's absence lets the telemetry transport's runtime +// probe fall back to gzip cleanly (see src/lib/telemetry/zstd-transport.ts). +// +// Kept out of the BunPolyfill literal so the `typeof Bun.zstdCompress` +// probe genuinely returns `"undefined"` on older runtimes — assigning +// a noop stub would defeat the feature-detect. +const zlibOptionalZstdCompress = (zlibModule as { zstdCompress?: unknown }) + .zstdCompress; +const zlibOptionalZstdDecompress = (zlibModule as { zstdDecompress?: unknown }) + .zstdDecompress; + +if ( + typeof zlibOptionalZstdCompress === "function" && + typeof zlibOptionalZstdDecompress === "function" +) { + const nodeZstdCompress = promisify( + zlibOptionalZstdCompress as ( + buf: Buffer, + options: { params?: Record }, + cb: (err: Error | null, result: Buffer) => void + ) => void + ); + const nodeZstdDecompress = promisify( + zlibOptionalZstdDecompress as ( + buf: Buffer, + cb: (err: Error | null, result: Buffer) => void + ) => void + ); + + (BunPolyfill as unknown as { zstdCompress: unknown }).zstdCompress = ( + data: NodeJS.TypedArray | Buffer | string | ArrayBuffer, + opts?: { level?: number } + ): Promise => + nodeZstdCompress(toBufferForCompression(data), { + params: { + [zlibConstants.ZSTD_c_compressionLevel]: opts?.level ?? 3, + }, + }); + + (BunPolyfill as unknown as { zstdDecompress: unknown }).zstdDecompress = ( + data: NodeJS.TypedArray | Buffer | string | ArrayBuffer + ): Promise => nodeZstdDecompress(toBufferForCompression(data)); +} + globalThis.Bun = BunPolyfill as typeof Bun; diff --git a/src/lib/telemetry.ts b/src/lib/telemetry.ts index 4a0519cae..2c10bd682 100644 --- a/src/lib/telemetry.ts +++ b/src/lib/telemetry.ts @@ -30,6 +30,7 @@ import { import { ApiError } from "./errors.js"; import { attachSentryReporter, logger } from "./logger.js"; import { getSentryBaseUrl, isSentrySaasUrl } from "./sentry-urls.js"; +import { makeCompressedTransport } from "./telemetry/zstd-transport.js"; import { getRealUsername } from "./utils.js"; export type { Span } from "@sentry/core"; @@ -518,6 +519,11 @@ export function initSentry( const client = Sentry.init({ dsn: SENTRY_CLI_DSN, enabled, + // Compress outgoing envelopes with zstd (level 3) instead of gzip — + // smaller payloads, faster compress/decompress on both sides. + // Automatic gzip fallback when running on Node < 22.15 without the + // `Bun.zstdCompress` polyfill (see script/node-polyfills.ts). + transport: makeCompressedTransport, // Keep default integrations but filter out ones that add overhead without benefit. // Important: Don't use defaultIntegrations: false as it may break debug ID support. // NodeSystemError is excluded on runtimes missing util.getSystemErrorMap (Bun) — CLI-K1. diff --git a/src/lib/telemetry/zstd-transport.ts b/src/lib/telemetry/zstd-transport.ts new file mode 100644 index 000000000..6b8151316 --- /dev/null +++ b/src/lib/telemetry/zstd-transport.ts @@ -0,0 +1,348 @@ +/** + * Custom Sentry SDK transport factory with zstd-first compression. + * + * Wraps `createTransport` from @sentry/core with a custom request executor + * that compresses outgoing envelope bodies with zstd level 3 (libzstd + * default). When running on a host without zstd support (Node < 22.15 + * without the polyfill installed), falls back to gzip — matches the + * SDK's default behavior byte-for-byte so there's no regression. + * + * Codec selection is one-shot, performed at factory-construction time. + * No per-request branching: if `Bun.zstdCompress` is available when the + * transport is created, every envelope uses zstd; otherwise every + * envelope uses gzip. + * + * This mirrors `@sentry/node-core/transports/http.js` `makeNodeTransport` + * — URL parsing, `no_proxy` handling, proxy agent, CA certs, keepAlive, + * IPv6 hostname unwrapping, rate-limit response header normalization — + * but swaps out the gzip-via-stream-pipe for an async one-shot compress + * that sets the correct `Content-Encoding`. + * + * Why not patch the SDK: + * `Sentry.init({ transport })` is a first-class extension point on + * the Client. Going via a custom factory avoids patch-file maintenance + * across SDK upgrades and avoids the gzip→gunzip→zstd waste that an + * `httpModule` shim would incur (the SDK has already piped the body + * through `createGzip()` by the time `httpModule.request()` runs). + */ + +// biome-ignore lint/performance/noNamespaceImport: http module must be passed as a namespace object (matches SDK's HTTPModule interface) +import * as http from "node:http"; +// biome-ignore lint/performance/noNamespaceImport: same as above for https +import * as https from "node:https"; +import { Readable } from "node:stream"; +import { promisify } from "node:util"; +import { gzip as gzipCb } from "node:zlib"; +import { + createTransport, + suppressTracing, + type Transport, + type TransportMakeRequestResponse, + type TransportRequest, + type TransportRequestExecutor, +} from "@sentry/core"; +import { + makeNodeTransport, + type NodeTransportOptions, +} from "@sentry/node-core/light"; + +/** Codec actually applied to a given envelope. */ +type AppliedEncoding = "zstd" | "gzip" | "none"; + +/** Codec the transport will attempt; "none" only happens under threshold. */ +type SelectedEncoding = "zstd" | "gzip"; + +/** + * zstd compression level. L3 is libzstd's default and was confirmed + * optimal for telemetry-sized payloads (1–30 KiB) by an offline + * benchmark before merge: L3–L6 sit on the same ratio-vs-time curve, + * and L3 wins on compress time without losing ratio. Higher levels + * (≥9) regress compress time without meaningful ratio gains. + */ +const ZSTD_LEVEL = 3; + +/** + * Minimum body length above which we attempt compression. + * + * For zstd we lower this from the SDK's 32 KiB gzip threshold to 1 KiB + * — Bun's zstd worker is cheap to dispatch and most error envelopes + * (5–15 KiB) would miss the 32 KiB cutoff and ship uncompressed + * otherwise. + */ +const ZSTD_THRESHOLD = 1024; + +/** + * Matches the SDK default. Kept identical to avoid any byte-level + * regression when the zstd fast path is unavailable. + */ +const GZIP_THRESHOLD = 1024 * 32; + +/** + * Shape of the globalThis.Bun subset we rely on. Bun's real types + * declare this, but the transport also runs under Node (via the + * feature-detected polyfill in `script/node-polyfills.ts`) where only + * a subset of Bun APIs are installed. + */ +type BunZstdHost = { + zstdCompress?: ( + data: Uint8Array | Buffer | string | ArrayBuffer, + options?: { level?: number } + ) => Promise; +}; + +const gzipAsync = promisify(gzipCb); + +/** + * Factory for the SDK's `Sentry.init({ transport })` option. + * + * Falls back to `makeNodeTransport` when a proxy is configured (the SDK + * owns CONNECT tunneling) or when the DSN URL is unparseable. Otherwise + * picks a one-shot codec — zstd if available, gzip otherwise — and + * wires up an executor. + */ +export function makeCompressedTransport( + options: NodeTransportOptions +): Transport { + let urlSegments: URL; + try { + urlSegments = new URL(options.url); + } catch { + return createTransport(options, () => Promise.resolve({})); + } + + if (shouldFallbackToDefault(urlSegments, options)) { + return makeNodeTransport(options); + } + + const isHttps = urlSegments.protocol === "https:"; + const nativeHttpModule = isHttps ? https : http; + const keepAlive = options.keepAlive ?? false; + const agent = new nativeHttpModule.Agent({ + keepAlive, + maxSockets: 30, + timeout: 2000, + }); + const httpModule = options.httpModule ?? nativeHttpModule; + const encoding: SelectedEncoding = hasZstdSupport() ? "zstd" : "gzip"; + const hostnameIsIPv6 = urlSegments.hostname.startsWith("["); + const hostname = hostnameIsIPv6 + ? urlSegments.hostname.slice(1, -1) + : urlSegments.hostname; + const path = `${urlSegments.pathname}${urlSegments.search}`; + + const executor: TransportRequestExecutor = (request: TransportRequest) => + new Promise((resolve, reject) => { + suppressTracing(() => { + performRequest({ + request, + options, + httpModule, + agent, + encoding, + hostname, + path, + port: urlSegments.port, + protocol: urlSegments.protocol, + }) + .then(resolve) + .catch(reject); + }); + }); + + return createTransport(options, executor); +} + +/** + * True iff a proxy is configured for this URL and not exempted by + * no_proxy. When true, the caller falls back to the SDK's default + * transport (which handles CONNECT tunneling). + * + * Mirrors `@sentry/node-core/transports/http.js` `applyNoProxyOption`'s + * proxy-resolution priority: + * - http → `options.proxy` | `http_proxy` + * - https → `options.proxy` | `https_proxy` | `http_proxy` + * + * Both upper- and lowercase env vars are recognized so behavior matches + * cURL / Node ecosystem convention. Lowercase wins when both are set, + * staying consistent with the SDK and {@link isNoProxyExempt}. + * + * @internal Exported for tests. + */ +export function shouldFallbackToDefault( + url: URL, + options: NodeTransportOptions +): boolean { + const isHttps = url.protocol === "https:"; + const httpProxy = process.env.http_proxy ?? process.env.HTTP_PROXY; + const httpsProxy = process.env.https_proxy ?? process.env.HTTPS_PROXY; + const envProxy = isHttps ? httpsProxy : httpProxy; + // SDK precedent: HTTPS falls back to http_proxy as a last resort. + const proxy = options.proxy || envProxy || httpProxy; + if (!proxy) { + return false; + } + return !isNoProxyExempt(url); +} + +type PerformRequestArgs = { + request: TransportRequest; + options: NodeTransportOptions; + httpModule: NonNullable; + agent: http.Agent; + encoding: SelectedEncoding; + hostname: string; + path: string; + port: string; + protocol: string; +}; + +async function performRequest( + args: PerformRequestArgs +): Promise { + const { request, options, httpModule, agent, encoding } = args; + + const rawBuffer = normalizeBody(request.body); + const { payload, encodingApplied } = await maybeCompress(rawBuffer, encoding); + + const headers: Record = { ...(options.headers ?? {}) }; + if (encodingApplied !== "none") { + headers["content-encoding"] = encodingApplied; + } + + return new Promise((resolve, reject) => { + const req = httpModule.request( + { + method: "POST", + agent, + headers, + hostname: args.hostname, + path: args.path, + port: args.port, + protocol: args.protocol, + ca: options.caCerts, + }, + (res) => { + // Drain the response body + res.on("data", drain); + res.on("end", drain); + res.setEncoding("utf8"); + const retryAfterHeader = res.headers["retry-after"] ?? null; + const rateLimitsHeader = res.headers["x-sentry-rate-limits"] ?? null; + resolve({ + statusCode: res.statusCode, + headers: { + "retry-after": Array.isArray(retryAfterHeader) + ? (retryAfterHeader[0] ?? null) + : retryAfterHeader, + "x-sentry-rate-limits": Array.isArray(rateLimitsHeader) + ? (rateLimitsHeader[0] ?? null) + : rateLimitsHeader, + }, + }); + } + ); + req.on("error", reject); + Readable.from(payload).pipe(req); + }); +} + +/** No-op used to drain HTTP response bodies. */ +function drain(): void { + // intentionally empty +} + +/** + * Coerce `string | Uint8Array` into a contiguous Buffer (zero-copy for + * Uint8Array; UTF-8 encoded for strings). + * + * @internal Exported for tests. + */ +export function normalizeBody(body: string | Uint8Array): Buffer { + if (typeof body === "string") { + return Buffer.from(body, "utf-8"); + } + return Buffer.from(body.buffer, body.byteOffset, body.byteLength); +} + +type CompressResult = { + payload: Buffer; + encodingApplied: AppliedEncoding; +}; + +/** + * Apply the pre-selected codec iff the body is large enough to benefit. + * Under threshold → passthrough with `encoding: "none"` (matches SDK + * default behavior). + * + * @internal Exported for tests. + */ +export async function maybeCompress( + buf: Buffer, + encoding: SelectedEncoding +): Promise { + const threshold = encoding === "zstd" ? ZSTD_THRESHOLD : GZIP_THRESHOLD; + if (buf.length <= threshold) { + return { payload: buf, encodingApplied: "none" }; + } + + if (encoding === "zstd") { + // Belt-and-braces — `Bun` may have been swapped out between + // construction and first send. Fall through to gzip if so, but + // re-apply the gzip threshold so mid-sized bodies (1-32 KiB) don't + // get compressed when the SDK default would have shipped them raw. + const bun = (globalThis as { Bun?: BunZstdHost }).Bun; + if (!bun?.zstdCompress) { + if (buf.length <= GZIP_THRESHOLD) { + return { payload: buf, encodingApplied: "none" }; + } + const gz = await gzipAsync(buf); + return { payload: gz, encodingApplied: "gzip" }; + } + const out = await bun.zstdCompress(buf, { level: ZSTD_LEVEL }); + return { + payload: Buffer.from(out.buffer, out.byteOffset, out.byteLength), + encodingApplied: "zstd", + }; + } + + const gz = await gzipAsync(buf); + return { payload: gz, encodingApplied: "gzip" }; +} + +/** Feature-detect zstd support on the current runtime. */ +export function hasZstdSupport(): boolean { + const bun = (globalThis as { Bun?: BunZstdHost }).Bun; + return typeof bun?.zstdCompress === "function"; +} + +/** + * Mirror the SDK's `applyNoProxyOption`: returns true iff the target + * URL matches an entry in `NO_PROXY` / `no_proxy`, in which case the + * proxy should be ignored. + * + * Slightly more permissive than the SDK: + * - Whitespace around comma-separated entries is trimmed + * (`"a.com, b.com"` is common; SDK does not trim). + * - The `"*"` wildcard means "bypass proxy for all hosts" — a + * convention from cURL / Go tooling that the SDK currently + * ignores (would route through the proxy regardless). We honor + * it so users with `no_proxy="*"` keep the zstd path. + * + * @internal Exported for tests. + */ +export function isNoProxyExempt(urlSegments: URL): boolean { + const noProxy = process.env.no_proxy ?? process.env.NO_PROXY; + if (!noProxy) { + return false; + } + const entries = noProxy + .split(",") + .map((ex) => ex.trim()) + .filter((ex) => ex.length > 0); + if (entries.includes("*")) { + return true; + } + return entries.some( + (ex) => urlSegments.host.endsWith(ex) || urlSegments.hostname.endsWith(ex) + ); +} diff --git a/test/lib/telemetry/zstd-transport.e2e.test.ts b/test/lib/telemetry/zstd-transport.e2e.test.ts new file mode 100644 index 000000000..0818eee7e --- /dev/null +++ b/test/lib/telemetry/zstd-transport.e2e.test.ts @@ -0,0 +1,145 @@ +/** + * E2E tests for the zstd transport. + * + * Spins up a real `http.createServer` on `127.0.0.1:0`, points the + * transport at it, and verifies the wire-level behavior: the request + * body is zstd-compressed, the `Content-Encoding` header is correct, + * and the body decompresses back to a valid envelope. + */ + +import { afterEach, describe, expect, test } from "bun:test"; +import { createServer, type IncomingMessage, type Server } from "node:http"; +import type { AddressInfo } from "node:net"; +import { createEnvelope } from "@sentry/core"; +import { + hasZstdSupport, + makeCompressedTransport, +} from "../../../src/lib/telemetry/zstd-transport.js"; + +/** No-op for SDK callbacks that require a function but return nothing meaningful. */ +function noop(): void { + // intentionally empty +} + +type CapturedRequest = { + headers: IncomingMessage["headers"]; + body: Buffer; +}; + +/** + * Track every server we start so they can be closed in teardown. + * Without this, a `let server` shared across tests is overwritten by + * the second test before the first one is closed — silent socket leak. + */ +const startedServers: Server[] = []; + +function startMockIngest( + responder: (req: IncomingMessage) => { + statusCode: number; + headers?: Record; + } +): Promise<{ + url: string; + captures: CapturedRequest[]; +}> { + const captures: CapturedRequest[] = []; + const server = createServer((req, res) => { + const chunks: Buffer[] = []; + req.on("data", (c: Buffer) => chunks.push(c)); + req.on("end", () => { + captures.push({ headers: req.headers, body: Buffer.concat(chunks) }); + const response = responder(req); + res.writeHead(response.statusCode, response.headers ?? {}); + res.end(); + }); + }); + startedServers.push(server); + + return new Promise((resolve) => { + server.listen(0, "127.0.0.1", () => { + const addr = server.address() as AddressInfo; + resolve({ + url: `http://127.0.0.1:${addr.port}/api/0/envelope/`, + captures, + }); + }); + }); +} + +describe("makeCompressedTransport (e2e)", () => { + afterEach(async () => { + // Close every server started in this test, in parallel. Lets the + // event loop drain naturally instead of relying on process exit. + await Promise.all( + startedServers.splice(0).map( + (s) => + new Promise((resolve) => { + s.close(() => resolve()); + }) + ) + ); + }); + + test("sends zstd-encoded envelope; server decompresses back to original", async () => { + if (!hasZstdSupport()) { + return; + } + + const { url, captures } = await startMockIngest(() => ({ + statusCode: 200, + })); + + const transport = makeCompressedTransport({ + url, + recordDroppedEvent: noop, + }); + + // Sizable envelope — above the 1 KiB zstd threshold. + const message = "x".repeat(4096); + const envelope: any = createEnvelope({ event_id: "abc" } as any, [ + [{ type: "event" } as any, { message } as any], + ]); + + const response = await transport.send(envelope); + expect(response.statusCode).toBe(200); + + expect(captures).toHaveLength(1); + expect(captures[0]?.headers["content-encoding"]).toBe("zstd"); + + const decompressed = await Bun.zstdDecompress(captures[0]!.body); + const text = Buffer.from( + decompressed.buffer, + decompressed.byteOffset, + decompressed.byteLength + ).toString("utf-8"); + expect(text).toContain(message); + expect(text).toContain('"type":"event"'); + }); + + test("rate-limit headers flow back into createTransport wrapper", async () => { + const { url, captures } = await startMockIngest(() => ({ + statusCode: 429, + headers: { + "retry-after": "60", + "x-sentry-rate-limits": "60:error:organization", + }, + })); + + const transport = makeCompressedTransport({ + url, + recordDroppedEvent: noop, + }); + + const envelope: any = createEnvelope({ event_id: "a" } as any, [ + [{ type: "event" } as any, { message: "hi" } as any], + ]); + const response = await transport.send(envelope); + + expect(response.statusCode).toBe(429); + expect(response.headers?.["retry-after"]).toBe("60"); + expect(response.headers?.["x-sentry-rate-limits"]).toBe( + "60:error:organization" + ); + expect(captures).toHaveLength(1); + }); +}); diff --git a/test/lib/telemetry/zstd-transport.property.test.ts b/test/lib/telemetry/zstd-transport.property.test.ts new file mode 100644 index 000000000..34e6f7f54 --- /dev/null +++ b/test/lib/telemetry/zstd-transport.property.test.ts @@ -0,0 +1,133 @@ +/** + * Property tests for the zstd transport's compression pipeline. + * + * Exercises our actual `normalizeBody` + `maybeCompress` helpers (not + * just `Bun.zstdCompress` directly) so we verify the real wire path + * round-trips for arbitrary inputs. + * + * Properties under test: + * 1. zstd-encoded compress → decompress round-trips for any byte sequence. + * 2. gzip-encoded compress → gunzip round-trips for any byte sequence. + * 3. UTF-8 string and equivalent `Uint8Array` inputs produce identical + * wire bytes when fed through `normalizeBody` + `maybeCompress`. + * (This validates our string-vs-bytes normalization, not Bun's + * determinism.) + * 4. Sub-threshold inputs are passthrough — `payload === buf` and + * `encodingApplied === "none"`. + */ + +import { describe, expect, test } from "bun:test"; +import { gunzipSync } from "node:zlib"; +import { + asyncProperty, + assert as fcAssert, + property, + string, + uint8Array, +} from "fast-check"; +import { + maybeCompress, + normalizeBody, +} from "../../../src/lib/telemetry/zstd-transport.js"; +import { DEFAULT_NUM_RUNS } from "../../model-based/helpers.js"; + +const ZSTD_THRESHOLD = 1024; +const GZIP_THRESHOLD = 32 * 1024; + +describe("property: maybeCompress round-trip (zstd path)", () => { + test("zstd compress → decompress returns the original bytes", async () => { + await fcAssert( + asyncProperty( + uint8Array({ minLength: ZSTD_THRESHOLD + 1, maxLength: 64 * 1024 }), + async (bytes) => { + const buf = Buffer.from(bytes); + const result = await maybeCompress(buf, "zstd"); + expect(result.encodingApplied).toBe("zstd"); + const decompressed = await Bun.zstdDecompress(result.payload); + expect(Buffer.from(decompressed).equals(buf)).toBe(true); + } + ), + { numRuns: DEFAULT_NUM_RUNS } + ); + }); +}); + +describe("property: maybeCompress round-trip (gzip path)", () => { + test("gzip compress → gunzip returns the original bytes", async () => { + await fcAssert( + asyncProperty( + uint8Array({ minLength: GZIP_THRESHOLD + 1, maxLength: 96 * 1024 }), + async (bytes) => { + const buf = Buffer.from(bytes); + const result = await maybeCompress(buf, "gzip"); + expect(result.encodingApplied).toBe("gzip"); + expect(gunzipSync(result.payload).equals(buf)).toBe(true); + } + ), + { numRuns: 25 } // gzip on 96 KiB is slower; fewer runs + ); + }); +}); + +describe("property: normalizeBody string/Uint8Array equivalence", () => { + test("string and TextEncoder-encoded bytes normalize to identical Buffers", () => { + fcAssert( + property(string({ minLength: 0, maxLength: 16 * 1024 }), (s) => { + const fromString = normalizeBody(s); + const fromBytes = normalizeBody(new TextEncoder().encode(s)); + expect(fromString.equals(fromBytes)).toBe(true); + }), + { numRuns: DEFAULT_NUM_RUNS } + ); + }); + + test("string input through full compress pipeline matches bytes input", async () => { + await fcAssert( + asyncProperty( + string({ minLength: ZSTD_THRESHOLD + 1, maxLength: 16 * 1024 }), + async (s) => { + const fromString = await maybeCompress(normalizeBody(s), "zstd"); + const fromBytes = await maybeCompress( + normalizeBody(new TextEncoder().encode(s)), + "zstd" + ); + expect(fromString.encodingApplied).toBe(fromBytes.encodingApplied); + expect(fromString.payload.equals(fromBytes.payload)).toBe(true); + } + ), + { numRuns: DEFAULT_NUM_RUNS } + ); + }); +}); + +describe("property: maybeCompress passthrough below threshold", () => { + test('zstd encoding + body ≤ 1 KiB → encodingApplied="none", payload === buf', async () => { + await fcAssert( + asyncProperty( + uint8Array({ minLength: 0, maxLength: ZSTD_THRESHOLD }), + async (bytes) => { + const buf = Buffer.from(bytes); + const result = await maybeCompress(buf, "zstd"); + expect(result.encodingApplied).toBe("none"); + expect(result.payload).toBe(buf); + } + ), + { numRuns: DEFAULT_NUM_RUNS } + ); + }); + + test('gzip encoding + body ≤ 32 KiB → encodingApplied="none", payload === buf', async () => { + await fcAssert( + asyncProperty( + uint8Array({ minLength: 0, maxLength: GZIP_THRESHOLD }), + async (bytes) => { + const buf = Buffer.from(bytes); + const result = await maybeCompress(buf, "gzip"); + expect(result.encodingApplied).toBe("none"); + expect(result.payload).toBe(buf); + } + ), + { numRuns: 25 } // 32 KiB arbitrary alloc is slower; fewer runs + ); + }); +}); diff --git a/test/lib/telemetry/zstd-transport.test.ts b/test/lib/telemetry/zstd-transport.test.ts new file mode 100644 index 000000000..db143cf87 --- /dev/null +++ b/test/lib/telemetry/zstd-transport.test.ts @@ -0,0 +1,697 @@ +/** + * Unit tests for {@link makeCompressedTransport}. + * + * Uses the SDK's `options.httpModule` extension point to inject a mock + * that captures the bytes written to the ClientRequest and synthesizes + * a response. We don't touch real sockets. + * + * Each scenario asserts on one of: + * 1. the `content-encoding` header as observed on the wire, + * 2. the compressed body round-tripping back to the input, + * 3. rate-limit response headers surfacing correctly to the + * `createTransport` layer. + */ + +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { EventEmitter } from "node:events"; +import type { ClientRequest, IncomingHttpHeaders } from "node:http"; +import { gunzipSync } from "node:zlib"; +import { createEnvelope } from "@sentry/core"; +import { + hasZstdSupport, + isNoProxyExempt, + makeCompressedTransport, + maybeCompress, + normalizeBody, + shouldFallbackToDefault, +} from "../../../src/lib/telemetry/zstd-transport.js"; + +/** No-op for SDK callbacks that require a function but return nothing meaningful. */ +function noop(): void { + // intentionally empty +} + +// ── Mock http module ───────────────────────────────────────────────── + +type MockResponseShape = { + statusCode: number; + headers: IncomingHttpHeaders; +}; + +type CapturedRequest = { + chunks: Buffer[]; + options: Record; +}; + +/** + * Build a fake {@link http} module that captures outbound body bytes and + * resolves with a pre-canned response. The returned object exposes both + * the shim and the capture buffer for test assertions. + */ +function buildMockHttpModule(response: MockResponseShape): { + httpModule: { + request: (opts: unknown, cb?: (res: unknown) => void) => ClientRequest; + }; + captured: CapturedRequest; +} { + const captured: CapturedRequest = { chunks: [], options: {} }; + const httpModule = { + request: (opts: unknown, cb?: (res: unknown) => void) => { + captured.options = opts as Record; + + // The ClientRequest must behave as a Writable so that + // `Readable.from(payload).pipe(req)` succeeds. We fake one from + // an EventEmitter and expose `write`/`end` that push into the + // captured buffer. + const req = new EventEmitter() as unknown as ClientRequest & { + write: (chunk: Buffer | string) => boolean; + end: (chunk?: Buffer | string) => void; + }; + req.write = (chunk: Buffer | string) => { + captured.chunks.push( + Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) + ); + return true; + }; + req.end = (chunk?: Buffer | string) => { + if (chunk !== undefined) { + captured.chunks.push( + Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) + ); + } + // Fire the response on next tick so the caller has time to + // finish piping before we resolve. The response object is a + // minimal IncomingMessage-like shape that the transport's + // handler uses. + process.nextTick(() => { + const res = Object.assign(new EventEmitter(), { + statusCode: response.statusCode, + headers: response.headers, + setEncoding: noop, + }); + cb?.(res); + // Drive data/end events the transport listens to. + process.nextTick(() => { + res.emit("data", Buffer.alloc(0)); + res.emit("end"); + }); + }); + }; + return req; + }, + }; + return { httpModule: httpModule as never, captured }; +} + +// ── Tests ──────────────────────────────────────────────────────────── + +const BASE_OPTIONS = { + url: "https://ingest.example.com/api/0/envelope/", + headers: { "x-sentry-auth": "Sentry sentry_key=abc" }, + recordDroppedEvent: noop, +}; + +describe("makeCompressedTransport", () => { + let savedZstd: typeof globalThis.Bun.zstdCompress | undefined; + + afterEach(() => { + // Restore Bun.zstdCompress after tests that stash it. + if (savedZstd !== undefined) { + globalThis.Bun.zstdCompress = savedZstd; + savedZstd = undefined; + } + }); + + test("zstd branch: sets Content-Encoding: zstd and round-trips", async () => { + if (!hasZstdSupport()) { + // On a runtime without zstd this test is meaningless. + return; + } + const { httpModule, captured } = buildMockHttpModule({ + statusCode: 200, + headers: {}, + }); + + const transport = makeCompressedTransport({ + ...BASE_OPTIONS, + httpModule, + }); + + // Build a large envelope — above ZSTD_THRESHOLD (1 KiB). + const payload = "x".repeat(4096); + const envelope: any = createEnvelope({} as any, [ + [{ type: "event" } as any, { data: payload } as any], + ]); + await transport.send(envelope); + + const headers = captured.options.headers as Record; + expect(headers["content-encoding"]).toBe("zstd"); + + const wire = Buffer.concat(captured.chunks); + expect(wire.length).toBeGreaterThan(0); + + // Decompress and verify the payload body is present + const decompressed = await Bun.zstdDecompress(wire); + const text = Buffer.from( + decompressed.buffer, + decompressed.byteOffset, + decompressed.byteLength + ).toString("utf-8"); + expect(text).toContain(payload); + }); + + test("gzip fallback: Bun.zstdCompress absent → Content-Encoding: gzip", async () => { + savedZstd = globalThis.Bun.zstdCompress; + // Stash + remove zstd to force the gzip branch + (globalThis as { Bun: { zstdCompress?: unknown } }).Bun.zstdCompress = + undefined as never; + + try { + const { httpModule, captured } = buildMockHttpModule({ + statusCode: 200, + headers: {}, + }); + + const transport = makeCompressedTransport({ + ...BASE_OPTIONS, + httpModule, + }); + + // Build a payload > GZIP_THRESHOLD (32 KiB). + const payload = "y".repeat(64 * 1024); + const envelope: any = createEnvelope({} as any, [ + [{ type: "event" } as any, { data: payload } as any], + ]); + await transport.send(envelope); + + const headers = captured.options.headers as Record; + expect(headers["content-encoding"]).toBe("gzip"); + + const wire = Buffer.concat(captured.chunks); + const decompressed = gunzipSync(wire); + const text = decompressed.toString("utf-8"); + expect(text).toContain(payload); + } finally { + // afterEach restores savedZstd + } + }); + + test("below threshold: no content-encoding header", async () => { + const { httpModule, captured } = buildMockHttpModule({ + statusCode: 200, + headers: {}, + }); + + const transport = makeCompressedTransport({ + ...BASE_OPTIONS, + httpModule, + }); + + // Tiny envelope - well below 1 KiB zstd threshold + const envelope: any = createEnvelope({} as any, [ + [{ type: "event" } as any, { tiny: "x" } as any], + ]); + await transport.send(envelope); + + const headers = captured.options.headers as Record; + expect(headers["content-encoding"]).toBeUndefined(); + + // The wire body should be exactly the serialized envelope + const wire = Buffer.concat(captured.chunks); + expect(wire.toString("utf-8")).toContain("event"); + }); + + test("rate-limit response headers bubble up (string form)", async () => { + const { httpModule } = buildMockHttpModule({ + statusCode: 429, + headers: { + "retry-after": "60", + "x-sentry-rate-limits": "60:error:organization", + }, + }); + + const transport = makeCompressedTransport({ + ...BASE_OPTIONS, + httpModule, + }); + + const envelope: any = createEnvelope({} as any, [ + [{ type: "event" } as any, { data: "small" } as any], + ]); + const response = await transport.send(envelope); + + expect(response.statusCode).toBe(429); + expect(response.headers?.["retry-after"]).toBe("60"); + expect(response.headers?.["x-sentry-rate-limits"]).toBe( + "60:error:organization" + ); + }); + + test("rate-limit response headers normalize array form", async () => { + const { httpModule } = buildMockHttpModule({ + statusCode: 429, + // Some proxies emit duplicate headers as arrays + headers: { + "retry-after": "30", + "x-sentry-rate-limits": [ + "30:transaction:organization", + "60:error:organization", + ] as never, + }, + }); + + const transport = makeCompressedTransport({ + ...BASE_OPTIONS, + httpModule, + }); + + const envelope: any = createEnvelope({} as any, [ + [{ type: "event" } as any, { data: "small" } as any], + ]); + const response = await transport.send(envelope); + + // Array collapsed to first element + expect(response.headers?.["x-sentry-rate-limits"]).toBe( + "30:transaction:organization" + ); + }); + + test("invalid URL: no-op transport returned", async () => { + const transport = makeCompressedTransport({ + ...BASE_OPTIONS, + url: "not a url", + }); + const envelope: any = createEnvelope({} as any, [ + [{ type: "event" } as any, {} as any], + ]); + // No-op transport resolves with empty response, does not throw. + const response = await transport.send(envelope); + expect(response).toEqual({}); + }); + + test("proxy configured: falls back to SDK's makeNodeTransport (no zstd applied)", async () => { + const savedProxy = process.env.https_proxy; + process.env.https_proxy = "http://proxy.internal:3128"; + try { + // The SDK's makeNodeTransport also honors options.httpModule, so + // we can route both paths through our mock and tell them apart by + // the Content-Encoding header on the wire: the zstd path sets it + // for any body > 1 KiB, while the SDK default only sets gzip for + // bodies > 32 KiB. A 4 KiB body therefore distinguishes the two + // — zstd path stamps "zstd", SDK path stamps nothing. + const { httpModule, captured } = buildMockHttpModule({ + statusCode: 200, + headers: {}, + }); + const transport = makeCompressedTransport({ + ...BASE_OPTIONS, + httpModule, + }); + const envelope: any = createEnvelope({} as any, [ + [{ type: "event" } as any, { data: "x".repeat(4096) } as any], + ]); + await transport.send(envelope); + + const headers = captured.options.headers as Record; + expect(headers["content-encoding"]).toBeUndefined(); + // And the wire body is the raw envelope (not zstd-compressed). + const wire = Buffer.concat(captured.chunks); + expect(wire.toString("utf-8")).toContain('"type":"event"'); + } finally { + if (savedProxy === undefined) { + delete process.env.https_proxy; + } else { + process.env.https_proxy = savedProxy; + } + } + }); + + test("network error on socket: promise rejects, nothing throws outward", async () => { + // Mock http module whose request() emits an 'error' event instead of + // responding. The executor's `req.on('error', reject)` must surface + // it to the outer promise, which createTransport's wrapper catches + // and records as network_error. + const throwingMod = { + request: (_opts: unknown, _cb?: unknown) => { + const req = new EventEmitter() as unknown as ClientRequest & { + write: (c: unknown) => boolean; + end: () => void; + }; + req.write = () => true; + req.end = () => { + process.nextTick(() => req.emit("error", new Error("ECONNREFUSED"))); + }; + return req; + }, + }; + const transport = makeCompressedTransport({ + ...BASE_OPTIONS, + httpModule: throwingMod as never, + }); + const envelope: any = createEnvelope({} as any, [ + [{ type: "event" } as any, { data: "x".repeat(4096) } as any], + ]); + // createTransport wraps network errors and re-throws them — a real + // API consumer would swallow this via .catch(). We just assert the + // promise settles (does not hang) and throws an ECONNREFUSED. + await expect(transport.send(envelope)).rejects.toThrow("ECONNREFUSED"); + }); + + test("proxy configured + URL is no_proxy exempt: uses zstd transport", async () => { + const savedProxy = process.env.https_proxy; + const savedNoProxy = process.env.no_proxy; + process.env.https_proxy = "http://proxy.internal:3128"; + process.env.no_proxy = "example.com"; + try { + const { httpModule, captured } = buildMockHttpModule({ + statusCode: 200, + headers: {}, + }); + const transport = makeCompressedTransport({ + ...BASE_OPTIONS, + httpModule, + }); + const envelope: any = createEnvelope({} as any, [ + [{ type: "event" } as any, { data: "small" } as any], + ]); + await transport.send(envelope); + // httpModule was called → we took the zstd path, not the SDK + // fallback (which would have ignored our httpModule mock and + // tried to connect through the proxy). + expect(captured.chunks.length).toBeGreaterThan(0); + } finally { + if (savedProxy === undefined) { + delete process.env.https_proxy; + } else { + process.env.https_proxy = savedProxy; + } + if (savedNoProxy === undefined) { + delete process.env.no_proxy; + } else { + process.env.no_proxy = savedNoProxy; + } + } + }); +}); + +// ── Direct helper tests ────────────────────────────────────────────── + +describe("normalizeBody", () => { + test("string → UTF-8 bytes", () => { + const buf = normalizeBody("hello"); + expect(buf.toString("utf-8")).toBe("hello"); + }); + + test("multi-byte UTF-8 string", () => { + const buf = normalizeBody("café ☕"); + expect(buf.toString("utf-8")).toBe("café ☕"); + }); + + test("Uint8Array → zero-copy Buffer view", () => { + const src = new Uint8Array([1, 2, 3, 4, 5]); + const buf = normalizeBody(src); + expect(buf.length).toBe(5); + expect(Array.from(buf)).toEqual([1, 2, 3, 4, 5]); + }); + + test("Uint8Array with non-zero byteOffset", () => { + const backing = new Uint8Array([9, 9, 1, 2, 3, 9, 9]); + const view = new Uint8Array(backing.buffer, 2, 3); + const buf = normalizeBody(view); + expect(Array.from(buf)).toEqual([1, 2, 3]); + }); + + test("empty string", () => { + expect(normalizeBody("").length).toBe(0); + }); + + test("empty Uint8Array", () => { + expect(normalizeBody(new Uint8Array(0)).length).toBe(0); + }); +}); + +describe("maybeCompress", () => { + test("zstd + body above threshold → zstd-compressed", async () => { + if (!hasZstdSupport()) { + return; + } + const buf = Buffer.from("x".repeat(4096)); + const result = await maybeCompress(buf, "zstd"); + expect(result.encodingApplied).toBe("zstd"); + expect(result.payload.length).toBeLessThan(buf.length); + const decompressed = await Bun.zstdDecompress(result.payload); + expect(decompressed.toString("utf-8")).toBe("x".repeat(4096)); + }); + + test("zstd + body below 1 KiB threshold → passthrough", async () => { + const buf = Buffer.from("x".repeat(512)); + const result = await maybeCompress(buf, "zstd"); + expect(result.encodingApplied).toBe("none"); + expect(result.payload).toBe(buf); + }); + + test("gzip + body above 32 KiB threshold → gzip-compressed", async () => { + const buf = Buffer.from("y".repeat(64 * 1024)); + const result = await maybeCompress(buf, "gzip"); + expect(result.encodingApplied).toBe("gzip"); + expect(result.payload.length).toBeLessThan(buf.length); + const decompressed = gunzipSync(result.payload); + expect(decompressed.toString("utf-8")).toBe("y".repeat(64 * 1024)); + }); + + test("gzip + body below 32 KiB threshold → passthrough", async () => { + const buf = Buffer.from("z".repeat(16 * 1024)); + const result = await maybeCompress(buf, "gzip"); + expect(result.encodingApplied).toBe("none"); + expect(result.payload).toBe(buf); + }); + + test("zstd path + Bun.zstdCompress missing mid-flight + >32 KiB → gzip safety net", async () => { + const saved = globalThis.Bun?.zstdCompress; + (globalThis as { Bun: { zstdCompress?: unknown } }).Bun.zstdCompress = + undefined as never; + try { + const buf = Buffer.from("x".repeat(64 * 1024)); + // Encoding pre-selected as "zstd" (caller didn't reprobe), but + // the runtime now lacks zstd — the belt-and-braces branch gzips. + const result = await maybeCompress(buf, "zstd"); + expect(result.encodingApplied).toBe("gzip"); + expect(gunzipSync(result.payload).toString("utf-8")).toBe( + "x".repeat(64 * 1024) + ); + } finally { + if (saved !== undefined) { + globalThis.Bun.zstdCompress = saved; + } + } + }); + + test("zstd path + Bun.zstdCompress missing mid-flight + 1-32 KiB → passthrough (matches SDK default)", async () => { + // Edge case: caller selected "zstd" because Bun.zstdCompress was + // present at construction, but the global got swapped out before + // the first send. For bodies between ZSTD_THRESHOLD (1 KiB) and + // GZIP_THRESHOLD (32 KiB) we MUST pass them through uncompressed, + // matching the SDK's default-transport behavior. Otherwise we'd + // gzip bodies the SDK would have shipped raw. + const saved = globalThis.Bun?.zstdCompress; + (globalThis as { Bun: { zstdCompress?: unknown } }).Bun.zstdCompress = + undefined as never; + try { + const buf = Buffer.from("x".repeat(8 * 1024)); + const result = await maybeCompress(buf, "zstd"); + expect(result.encodingApplied).toBe("none"); + expect(result.payload).toBe(buf); + } finally { + if (saved !== undefined) { + globalThis.Bun.zstdCompress = saved; + } + } + }); +}); + +describe("isNoProxyExempt", () => { + let savedNoProxy: string | undefined; + let savedNoProxyUpper: string | undefined; + + beforeEach(() => { + savedNoProxy = process.env.no_proxy; + savedNoProxyUpper = process.env.NO_PROXY; + delete process.env.no_proxy; + delete process.env.NO_PROXY; + }); + + afterEach(() => { + if (savedNoProxy === undefined) { + delete process.env.no_proxy; + } else { + process.env.no_proxy = savedNoProxy; + } + if (savedNoProxyUpper === undefined) { + delete process.env.NO_PROXY; + } else { + process.env.NO_PROXY = savedNoProxyUpper; + } + }); + + test("no env var set → not exempt", () => { + expect(isNoProxyExempt(new URL("https://ingest.example.com/"))).toBe(false); + }); + + test("suffix match in no_proxy → exempt", () => { + process.env.no_proxy = "example.com,internal.lan"; + expect(isNoProxyExempt(new URL("https://ingest.example.com/"))).toBe(true); + }); + + test("no match → not exempt", () => { + process.env.no_proxy = "other.com"; + expect(isNoProxyExempt(new URL("https://ingest.example.com/"))).toBe(false); + }); + + test("NO_PROXY (uppercase) also recognized", () => { + process.env.NO_PROXY = "example.com"; + expect(isNoProxyExempt(new URL("https://ingest.example.com/"))).toBe(true); + }); + + test("lowercase takes precedence over uppercase", () => { + process.env.no_proxy = "other.com"; + process.env.NO_PROXY = "example.com"; + // Lowercase wins → uppercase ignored → no match → not exempt + expect(isNoProxyExempt(new URL("https://ingest.example.com/"))).toBe(false); + }); + + test("trims whitespace around comma-separated entries", () => { + // Common config style: `"a.com, b.com"` — both entries should match + // even with the leading space after the comma. + process.env.no_proxy = "other.com, ingest.example.com"; + expect(isNoProxyExempt(new URL("https://ingest.example.com/"))).toBe(true); + }); + + test("ignores empty entries (trailing comma, double comma)", () => { + process.env.no_proxy = "other.com,, , example.com,"; + expect(isNoProxyExempt(new URL("https://ingest.example.com/"))).toBe(true); + // Empty entry must not match every host (would be true if `endsWith("")` + // were ever evaluated). + expect(isNoProxyExempt(new URL("https://other-host.test/"))).toBe(false); + }); + + test("'*' wildcard exempts all hosts", () => { + process.env.no_proxy = "*"; + expect(isNoProxyExempt(new URL("https://ingest.example.com/"))).toBe(true); + expect(isNoProxyExempt(new URL("https://anything.test/"))).toBe(true); + expect(isNoProxyExempt(new URL("http://192.0.2.1:8080/"))).toBe(true); + }); + + test("'*' wildcard alongside other entries still exempts everything", () => { + process.env.no_proxy = "specific.com, *, other.com"; + expect(isNoProxyExempt(new URL("https://unrelated.test/"))).toBe(true); + }); + + test("literal '*' in a host name does not get matched as wildcard", () => { + // The wildcard check requires `*` to be a standalone entry. + // A bizarre value like `"*.example.com"` is treated as a literal + // suffix: `endsWith("*.example.com")` is false for normal hosts. + process.env.no_proxy = "*.example.com"; + expect(isNoProxyExempt(new URL("https://foo.example.com/"))).toBe(false); + }); +}); + +describe("hasZstdSupport", () => { + test("true when Bun.zstdCompress is present (Bun runtime)", () => { + // Running under bun test, so this should always be true. + expect(hasZstdSupport()).toBe(true); + }); + + test("false when Bun.zstdCompress is absent (simulated)", () => { + const saved = globalThis.Bun?.zstdCompress; + (globalThis as { Bun: { zstdCompress?: unknown } }).Bun.zstdCompress = + undefined as never; + try { + expect(hasZstdSupport()).toBe(false); + } finally { + if (saved !== undefined) { + globalThis.Bun.zstdCompress = saved; + } + } + }); +}); + +describe("shouldFallbackToDefault", () => { + const PROXY_VARS = [ + "http_proxy", + "HTTP_PROXY", + "https_proxy", + "HTTPS_PROXY", + "no_proxy", + "NO_PROXY", + ] as const; + const saved: Partial> = {}; + + beforeEach(() => { + for (const k of PROXY_VARS) { + saved[k] = process.env[k]; + delete process.env[k]; + } + }); + + afterEach(() => { + for (const k of PROXY_VARS) { + const v = saved[k]; + if (v === undefined) { + delete process.env[k]; + } else { + process.env[k] = v; + } + } + }); + + const opts = { url: "https://ingest.example.com/", recordDroppedEvent: noop }; + const httpsUrl = new URL("https://ingest.example.com/"); + const httpUrl = new URL("http://ingest.example.com/"); + + test("no proxy configured → no fallback", () => { + expect(shouldFallbackToDefault(httpsUrl, opts)).toBe(false); + }); + + test("options.proxy wins → fallback", () => { + expect( + shouldFallbackToDefault(httpsUrl, { + ...opts, + proxy: "http://proxy.internal:3128", + }) + ).toBe(true); + }); + + test("lowercase https_proxy → fallback (HTTPS URL)", () => { + process.env.https_proxy = "http://proxy.internal:3128"; + expect(shouldFallbackToDefault(httpsUrl, opts)).toBe(true); + }); + + test("uppercase HTTPS_PROXY → fallback (HTTPS URL)", () => { + process.env.HTTPS_PROXY = "http://proxy.internal:3128"; + expect(shouldFallbackToDefault(httpsUrl, opts)).toBe(true); + }); + + test("lowercase wins over uppercase when both are set", () => { + process.env.https_proxy = "http://winning.proxy:3128"; + process.env.HTTPS_PROXY = "http://losing.proxy:3128"; + // Both trigger fallback; this just asserts the lookup doesn't + // crash on duplicate vars and that the function still returns true. + expect(shouldFallbackToDefault(httpsUrl, opts)).toBe(true); + }); + + test("HTTPS URL falls back to http_proxy when https_proxy is unset (matches SDK precedent)", () => { + process.env.http_proxy = "http://proxy.internal:3128"; + expect(shouldFallbackToDefault(httpsUrl, opts)).toBe(true); + }); + + test("uppercase HTTP_PROXY → fallback (http URL)", () => { + process.env.HTTP_PROXY = "http://proxy.internal:3128"; + expect(shouldFallbackToDefault(httpUrl, opts)).toBe(true); + }); + + test("uppercase NO_PROXY exemption keeps zstd path even with proxy set", () => { + process.env.HTTPS_PROXY = "http://proxy.internal:3128"; + process.env.NO_PROXY = "example.com"; + expect(shouldFallbackToDefault(httpsUrl, opts)).toBe(false); + }); +});