diff --git a/core/src/enterprise/buffered-event-stream.ts b/core/src/enterprise/buffered-event-stream.ts index 20bfbc6bbd..622d5e80b1 100644 --- a/core/src/enterprise/buffered-event-stream.ts +++ b/core/src/enterprise/buffered-event-stream.ts @@ -71,13 +71,6 @@ export interface ApiLogBatch extends ApiBatchBase { export const controlEventNames: Set = new Set(["_workflowRunRegistered"]) -/** - * We use 600 kilobytes as the maximum combined size of the events / log entries in a given batch. This number - * was chosen to fit comfortably below e.g. nginx' default max request size, while still being able to carry a decent - * number of records. - */ -export const MAX_BATCH_BYTES = 600 * 1000 // 600 kilobytes - /** * Buffers events and log entries and periodically POSTs them to Garden Enterprise or another Garden service. * @@ -107,6 +100,13 @@ export class BufferedEventStream { private bufferedLogEntries: LogEntryEvent[] protected intervalMsec = 1000 + /** + * We use 600 kilobytes as the maximum combined size of the events / log entries in a given batch. This number + * was chosen to fit comfortably below e.g. nginx' default max request size, while still being able to carry a decent + * number of records. + */ + private maxBatchBytes = 600 * 1024 // 600 kilobytes + constructor(log: LogEntry, sessionId: string) { this.sessionId = sessionId this.log = log @@ -255,7 +255,7 @@ export class BufferedEventStream { * We don't throw an exception here, since a failure to stream events and log entries doesn't mean that the * command failed. */ - this.log.error(`Error while flushing events and log entries: ${err.message}`) + this.log.debug(`Error while flushing events and log entries: ${err.message}`) } } @@ -300,14 +300,16 @@ export class BufferedEventStream { makeBatch(buffered: B[]): B[] { const batch: B[] = [] let batchBytes = 0 - while (batchBytes < MAX_BATCH_BYTES && buffered.length > 0) { - const nextRecordBytes = Buffer.from(JSON.stringify(buffered[0])).length - if (batchBytes + nextRecordBytes > MAX_BATCH_BYTES) { - break - } - if (nextRecordBytes > MAX_BATCH_BYTES) { + while (batchBytes < this.maxBatchBytes && buffered.length > 0) { + let nextRecordBytes = Buffer.from(JSON.stringify(buffered[0])).length + if (nextRecordBytes > this.maxBatchBytes) { this.log.error(`Event or log entry too large to flush, dropping it.`) this.log.debug(JSON.stringify(buffered[0])) + buffered.shift() // Drop first record. + nextRecordBytes = Buffer.from(JSON.stringify(buffered[0])).length + } + if (batchBytes + nextRecordBytes > this.maxBatchBytes) { + break } batch.push(buffered.shift() as B) batchBytes += nextRecordBytes diff --git a/core/test/unit/src/platform/buffered-event-stream.ts b/core/test/unit/src/enterprise/buffered-event-stream.ts similarity index 58% rename from core/test/unit/src/platform/buffered-event-stream.ts rename to core/test/unit/src/enterprise/buffered-event-stream.ts index d8e22c068e..562b4c5820 100644 --- a/core/test/unit/src/platform/buffered-event-stream.ts +++ b/core/test/unit/src/enterprise/buffered-event-stream.ts @@ -11,7 +11,11 @@ import { StreamEvent, LogEntryEvent, BufferedEventStream } from "../../../../src import { getLogger } from "../../../../src/logger/logger" import { Garden } from "../../../../src/garden" import { makeTestGardenA } from "../../../helpers" -import { find, isMatch } from "lodash" +import { find, isMatch, range, repeat } from "lodash" + +function makeDummyRecord(sizeKb: number) { + return { someKey: repeat("a", sizeKb * 1024) } +} describe("BufferedEventStream", () => { const getConnectionParams = (garden: Garden) => ({ @@ -89,4 +93,42 @@ describe("BufferedEventStream", () => { expect(find(flushedEvents, (e) => isMatch(e, { name: "_test", payload: "event" }))).to.exist }) + + describe("makeBatch", () => { + const maxBatchBytes = 3 * 1024 // Set this to a low value (3 Kb) to keep the memory use of the test suite low. + it("should pick records until the batch size reaches MAX_BATCH_BYTES", async () => { + const recordSizeKb = 0.5 + const log = getLogger().placeholder() + const bufferedEventStream = new BufferedEventStream(log, "dummy-session-id") + bufferedEventStream["maxBatchBytes"] = maxBatchBytes + // Total size is ~3MB, which exceeds MAX_BATCH_BYTES + const records = range(100).map((_) => makeDummyRecord(recordSizeKb)) + const batch = bufferedEventStream.makeBatch(records) + const batchSize = Buffer.from(JSON.stringify(batch)).length + expect(batch.length).to.be.lte(records.length) + expect(batch.length).to.be.lte(maxBatchBytes / (recordSizeKb * 1024)) + expect(batchSize).to.be.lte(maxBatchBytes) + }) + + it("should drop individual records whose payload size exceeds MAX_BATCH_BYTES", async () => { + const recordSizeKb = 0.5 + const log = getLogger().placeholder() + const bufferedEventStream = new BufferedEventStream(log, "dummy-session-id") + bufferedEventStream["maxBatchBytes"] = maxBatchBytes + // This record's size, exceeds MAX_BATCH_BYTES, so it should be dropped by `makeBatch`. + const tooLarge = { + ...makeDummyRecord(maxBatchBytes / 1024 + 3), + tag: "tooLarge", + } + const records = [tooLarge, ...range(100).map((_) => makeDummyRecord(recordSizeKb))] + const batch = bufferedEventStream.makeBatch(records) + const batchSize = Buffer.from(JSON.stringify(batch)).length + + expect(batch.find((r) => r["tag"] === "tooLarge")).to.be.undefined // We expect `tooLarge` to have been dropped. + expect(batch.length).to.be.gte(3) + expect(batch.length).to.be.lte(records.length) + expect(batch.length).to.be.lte(maxBatchBytes / (recordSizeKb * 1024)) + expect(batchSize).to.be.lte(maxBatchBytes) + }) + }) })