Skip to content

Commit 3285e4b

Browse files
fix(cloud): potential OOM when Cloud connection fails (#7861)
Turns out this wasn't fully addressed previously, and there were some scenarios left in where gRPC stream errors could feed back into the buffer and cause a recursion in log entries stacking up. Also extended the timeout to deliver events at the end of execution. Co-authored-by: Jon Edvald <edvald@gmail.com>
1 parent da618ee commit 3285e4b

File tree

3 files changed

+77
-47
lines changed

3 files changed

+77
-47
lines changed

core/src/cloud/api/grpc-event-converter.ts

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
} from "@buf/garden_grow-platform.bufbuild_es/garden/public/events/v1/events_pb.js"
1717
import type { EventName as CoreEventName, EventPayload as CoreEventPayload } from "../../events/events.js"
1818
import type { GardenWithNewBackend } from "../../garden.js"
19-
import type { LogSymbol, Msg } from "../../logger/log-entry.js"
19+
import type { LogParams, LogSymbol, Msg } from "../../logger/log-entry.js"
2020
import { isActionLogContext, isCoreLogContext, type Log } from "../../logger/log-entry.js"
2121
import { monotonicFactory, ulid, type ULID, type UUID } from "ulid"
2222
import { create } from "@bufbuild/protobuf"
@@ -57,6 +57,10 @@ import {
5757
LogSymbol as GrpcLogSymbol,
5858
} from "@buf/garden_grow-platform.bufbuild_es/garden/public/events/v1/garden_logs_pb.js"
5959
import type { LogEntryEventPayload } from "../api-legacy/restful-event-stream.js"
60+
import type { StringLogLevel } from "../../logger/logger.js"
61+
62+
export const GRPC_INTERNAL_LOG_ORIGIN = "grpc-event-stream"
63+
export const GRPC_INTERNAL_LOG_PREFIX = "GrpcEventStream:"
6064

6165
const nextEventUlid = monotonicFactory()
6266

@@ -80,7 +84,7 @@ const aecEnvironmentUpdateActionTriggeredMap = {
8084

8185
export class GrpcEventConverter {
8286
private readonly garden: GardenWithNewBackend
83-
private readonly log: Log
87+
private readonly _log: Log
8488
private readonly streamLogEntries: boolean
8589

8690
/**
@@ -92,10 +96,14 @@ export class GrpcEventConverter {
9296

9397
constructor(garden: GardenWithNewBackend, log: Log, streamLogEntries: boolean) {
9498
this.garden = garden
95-
this.log = log
99+
this._log = log
96100
this.streamLogEntries = streamLogEntries
97101
}
98102

103+
private log(level: StringLogLevel, fn: () => string): Log {
104+
return this._log[level](wrapGrpcInternalLog(fn))
105+
}
106+
99107
convert<T extends CoreEventName>(name: T, payload: CoreEventPayload<T>): GrpcEventEnvelope[] {
100108
const context = this.getGardenEventContext(payload)
101109

@@ -166,6 +174,14 @@ export class GrpcEventConverter {
166174
if (!this.streamLogEntries) {
167175
return []
168176
}
177+
if (
178+
payload.context.origin === GRPC_INTERNAL_LOG_ORIGIN ||
179+
(payload.message.msg &&
180+
typeof payload.message.msg === "string" &&
181+
payload.message.msg.startsWith(GRPC_INTERNAL_LOG_PREFIX))
182+
) {
183+
return []
184+
}
169185
const msg = resolveMsg(payload.message.msg)
170186
let rawMsg = resolveMsg(payload.message.rawMsg)
171187

@@ -247,7 +263,7 @@ export class GrpcEventConverter {
247263
const status = aecAgentStatusMap[payload.status]
248264

249265
if (!status) {
250-
this.log.warn(`GrpcEventStream: Unhandled aec agent status '${payload.status}', ignoring event`)
266+
this.log("warn", () => `Unhandled AEC agent status '${payload.status}', ignoring event`)
251267
return []
252268
}
253269

@@ -357,12 +373,12 @@ export class GrpcEventConverter {
357373
]
358374
} else {
359375
payload.operation satisfies never // ensure all cases are handled
360-
this.log.silly(`GrpcEventStream: Unhandled action operation ${payload.operation}`)
376+
this.log("silly", () => `Unhandled action operation ${payload.operation}`)
361377
return []
362378
}
363379
default:
364380
payload satisfies never // ensure all cases are handled
365-
this.log.silly(`GrpcEventStream: Unhandled action state ${payload}`)
381+
this.log("silly", () => `Unhandled action state ${payload}`)
366382
return []
367383
}
368384
}
@@ -574,7 +590,7 @@ export class GrpcEventConverter {
574590

575591
const generatedUlid = ulid()
576592
GrpcEventConverter.uuidToUlidMap.set(uuid, generatedUlid)
577-
this.log.silly(() => `GrpcEventConverter: Mapped ${fromDescription}=${uuid} to ${toDescription}=${generatedUlid}`)
593+
this.log("silly", () => `Mapped ${fromDescription}=${uuid} to ${toDescription}=${generatedUlid}`)
578594
return generatedUlid
579595
}
580596
}
@@ -644,3 +660,10 @@ function resolveMsg(msg: Msg | undefined): string | undefined {
644660
}
645661
return msg
646662
}
663+
664+
export function wrapGrpcInternalLog(fn: () => string): Omit<LogParams, "symbol"> {
665+
return {
666+
msg: () => `${GRPC_INTERNAL_LOG_PREFIX} ${fn()}`,
667+
origin: GRPC_INTERNAL_LOG_ORIGIN,
668+
}
669+
}

core/src/cloud/api/grpc-event-stream.ts

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ import type { WritableIterable } from "@connectrpc/connect/protocol"
2626
import { createWritableIterable } from "@connectrpc/connect/protocol"
2727

2828
import { GardenCloudError } from "./api.js"
29-
import { describeGrpcEvent, GrpcEventConverter } from "./grpc-event-converter.js"
29+
import { describeGrpcEvent, GrpcEventConverter, wrapGrpcInternalLog } from "./grpc-event-converter.js"
3030
import { create } from "@bufbuild/protobuf"
3131
import { InternalError } from "../../exceptions.js"
32+
import type { StringLogLevel } from "../../logger/logger.js"
3233

3334
export class GrpcEventStream {
3435
private readonly garden: GardenWithNewBackend
35-
private readonly log: Log
36+
private readonly _log: Log
3637

3738
private readonly eventListener: GardenEventAnyListener<EventName>
3839
private readonly logListener: GardenEventAnyListener<"logEntry">
@@ -64,12 +65,12 @@ export class GrpcEventStream {
6465
streamLogEntries: boolean
6566
}) {
6667
this.garden = garden
67-
this.log = log
68+
this._log = log
6869
this.eventIngestionService = eventIngestionService
6970
this.isClosed = false
7071
this.streamLogEntries = streamLogEntries
7172

72-
this.converter = new GrpcEventConverter(this.garden, this.log, this.streamLogEntries)
73+
this.converter = new GrpcEventConverter(this.garden, this._log, this.streamLogEntries)
7374

7475
// TODO: make sure it waits for the callback function completion
7576
registerCleanupFunction("grow-stream-session-cancelled-event", () => {
@@ -88,31 +89,31 @@ export class GrpcEventStream {
8889
return
8990
}
9091

91-
this.log.root.events.onAny(this.logListener)
92+
this._log.root.events.onAny(this.logListener)
9293

9394
this.eventListener = (name, payload) => {
9495
this.handleEvent(name, payload)
9596
}
9697
this.garden.events.onAny(this.eventListener)
9798

9899
setTimeout(async () => {
99-
this.log.silly("GrpcEventStream: Starting loop")
100+
this.log("silly", () => "Starting loop")
100101

101102
while (!this.isClosed) {
102-
this.log.silly("GrpcEventStream: Connecting ...")
103+
this.log("silly", () => "Connecting...")
103104

104105
try {
105106
await this.streamEvents()
106107
} catch (err) {
107108
if (err instanceof ConnectError) {
108-
this.log.silly(`GrpcEventStream: Error while streaming events: ${err}`)
109-
this.log.silly("GrpcEventStream: Retrying in 1 second...")
109+
this.log("silly", () => `Error while streaming events: ${err}`)
110+
this.log("silly", () => "Retrying in 1 second...")
110111
await sleep(1000)
111112
} else {
112113
// This is a temporary workaround to avoid crashing the process when the new event system is not in production.
113114
// In production, we want to crash the process to surface the issue.
114-
this.log.debug(`GrpcEventStream: Unexpected error while streaming events: ${err}`)
115-
this.log.debug("GrpcEventStream: Bailing out.")
115+
this.log("silly", () => `Unexpected error while streaming events: ${err}`)
116+
this.log("silly", () => "Bailing out.")
116117
break
117118
// TODO(production): remove the code above and uncomment the following.
118119
// This will become an unhandled error and will cause the process to crash.
@@ -123,16 +124,20 @@ export class GrpcEventStream {
123124
}, 0)
124125
}
125126

127+
private log(level: StringLogLevel, fn: () => string): Log {
128+
return this._log[level](wrapGrpcInternalLog(fn))
129+
}
130+
126131
async close() {
127132
if (this.isClosed) {
128133
return
129134
}
130135

131136
this.garden.events.offAny(this.eventListener)
132-
this.log.root.events.offAny(this.logListener)
137+
this._log.root.events.offAny(this.logListener)
133138

134139
if (this.eventBuffer.size === 0) {
135-
this.log.silly("GrpcEventStream: Close called and no events waiting for acknowledgement. Disconnecting...")
140+
this.log("silly", () => "Close called and no events waiting for acknowledgement. Disconnecting...")
136141
this.isClosed = true
137142
// close the connection as well
138143
this.outputStream?.close()
@@ -145,20 +150,22 @@ export class GrpcEventStream {
145150
})
146151

147152
// Wait max 1 second for the events to be acknowledged
148-
const timeout = 1000
153+
const timeout = 5000
149154
// TODO(production): use 30 seconds when we go to production, but for now let's only wait 100ms for acknowledgements.
150155
// const timeout = 30000
151156

152157
const timeoutSec = timeout / 1000
153158
await Promise.race([
154159
promise,
155160
(async () => {
156-
this.log.debug(`Waiting for ${timeoutSec} seconds to flush events to Garden Cloud`)
161+
this.log("verbose", () => `Waiting up to ${timeoutSec} seconds to flush events to Garden Cloud`)
157162
await sleep(timeout)
158-
this.log.debug(
159-
`GrpcEventStream: Not all events were acknowledged within ${timeoutSec} seconds. Information in Garden Cloud may be incomplete.`
163+
this.log(
164+
"debug",
165+
() =>
166+
`Not all events were acknowledged within ${timeoutSec} seconds. Information in Garden Cloud may be incomplete.`
160167
)
161-
this.log.warn("Not all events have been sent to Garden Cloud. Check the debug logs for more details.")
168+
this.log("warn", () => "Not all events have been sent to Garden Cloud. Check the debug logs for more details.")
162169
})(),
163170
])
164171

@@ -172,14 +179,12 @@ export class GrpcEventStream {
172179
try {
173180
events = this.converter.convert(name, payload)
174181
} catch (err) {
175-
this.log.warn(`GrpcEventStream: Error while converting event ${name}: ${err}`)
182+
this.log("warn", () => `Error while converting event ${name}: ${err}`)
176183
return
177184
}
178185

179186
for (const event of events) {
180-
this.log.silly(
181-
() => `GrpcEventStream: ${this.outputStream ? "Sending" : "Buffering"} event ${describeGrpcEvent(event)}`
182-
)
187+
this.log("silly", () => `${this.outputStream ? "Sending" : "Buffering"} event ${describeGrpcEvent(event)}`)
183188

184189
// The eventUlid must be set by the converter call above.
185190
// The field is optional because of the protobuf validation rules.
@@ -196,7 +201,7 @@ export class GrpcEventStream {
196201
?.write(create(IngestEventsRequestSchema, { event }))
197202
// This must be at a log level higher than what's streamed because otherwise we exponentially spam the logs
198203
// when we can't write events to the stream.
199-
.catch((err) => this.log.silly(`GrpcEventStream: Failed to write event ${event.eventUlid}: ${err}`))
204+
.catch((err) => this.log("silly", () => `Failed to write event ${event.eventUlid}: ${err}`))
200205
}
201206
}
202207

@@ -207,18 +212,18 @@ export class GrpcEventStream {
207212
try {
208213
ackStream = this.eventIngestionService.ingestEvents(this.outputStream)
209214
} catch (err) {
210-
this.log.debug(`GrpcEventStream: Failed to start event ingestion bi-directional stream: ${err}`)
215+
this.log("debug", () => `Failed to start event ingestion bi-directional stream: ${err}`)
211216
throw err
212217
}
213218

214-
this.log.silly(() => "GrpcEventStream: Connected")
219+
this.log("silly", () => "Connected")
215220

216221
this.flushEventBuffer()
217222

218223
try {
219224
await this.consumeAcks(ackStream)
220225
} catch (err) {
221-
this.log.debug(`GrpcEventStream: Error while consuming acks: ${err}`)
226+
this.log("silly", () => `Error while consuming acks: ${err}`)
222227
// Let the outer retry handle this
223228
throw err
224229
} finally {
@@ -230,14 +235,16 @@ export class GrpcEventStream {
230235
private async consumeAcks(ackStream: AsyncIterable<IngestEventsResponse>) {
231236
for await (const nextAck of ackStream) {
232237
if (!nextAck.success) {
233-
this.log.debug(
234-
`GrpcEventStream: Server failed to process event ulid=${nextAck.eventUlid}, final=${nextAck.final}: ` +
238+
this.log(
239+
"debug",
240+
() =>
241+
`Server failed to process event ulid=${nextAck.eventUlid}, final=${nextAck.final}: ` +
235242
`${JSON.stringify(this.eventBuffer.get(nextAck.eventUlid), (_, v) =>
236243
typeof v === "bigint" ? v.toString() : v
237244
)}`
238245
)
239246
} else {
240-
this.log.silly(() => `GrpcEventStream: Received ack for event ${nextAck.eventUlid}, final=${nextAck.final}`)
247+
this.log("silly", () => `Received ack for event ${nextAck.eventUlid}, final=${nextAck.final}`)
241248
}
242249

243250
// Remove acknowledged event from the buffer
@@ -248,33 +255,33 @@ export class GrpcEventStream {
248255
const messages = nextAck.messages || []
249256
for (const msg of messages) {
250257
const logMessage = `${this.garden.cloudApi.distroName} failed to process event ulid=${nextAck.eventUlid}: ${msg.text}`
258+
let logLevel: StringLogLevel = "warn"
251259

252260
switch (msg.severity) {
253261
case IngestEventsResponse_Message_Severity.DEBUG:
254-
this.log.debug(logMessage)
262+
logLevel = "debug"
255263
break
256264
case IngestEventsResponse_Message_Severity.INFO:
257-
this.log.info(logMessage)
265+
logLevel = "info"
258266
break
259267
case IngestEventsResponse_Message_Severity.WARNING:
260-
this.log.warn(logMessage)
268+
logLevel = "warn"
261269
break
262270
case IngestEventsResponse_Message_Severity.ERROR:
263271
throw new GardenCloudError({
264272
message: logMessage,
265273
})
266274
case IngestEventsResponse_Message_Severity.UNSPECIFIED:
267-
this.log.debug(
268-
`GrpcEventStream: Unspecified message severity for event ulid=${nextAck.eventUlid}: ${msg.text}`
269-
)
270275
break
271276
default:
272277
msg.severity satisfies never
273278
}
279+
280+
this.log("debug", () => `[${logLevel}] ${logMessage}`)
274281
}
275282

276283
if (this.closeCallbacks.length && this.eventBuffer.size === 0) {
277-
this.log.silly("GrpcEventStream: All events have been acknowledged. Disconnecting...")
284+
this.log("silly", () => "All events have been acknowledged. Disconnecting...")
278285
for (const callback of this.closeCallbacks) {
279286
// Call all the callbacks to notify that the stream is closed
280287
callback()
@@ -295,18 +302,18 @@ export class GrpcEventStream {
295302
return
296303
}
297304

298-
this.log.silly(() => `GrpcEventStream: Flushing ${this.eventBuffer.size} events from the buffer`)
305+
this.log("silly", () => `Flushing ${this.eventBuffer.size} events from the buffer`)
299306

300307
// NOTE: The Map implementation in the javascript runtime guarantees that values will be iterated in the order they were added (FIFO).
301308
for (const event of this.eventBuffer.values()) {
302309
if (!this.outputStream) {
303-
this.log.silly(() => `GrpcEventStream: Stream closed during flush`)
310+
this.log("silly", () => `Stream closed during flush`)
304311
break
305312
}
306313

307314
// NOTE: We ignore the promise on purpose to avoid out-of-order events.
308315
void this.outputStream.write(create(IngestEventsRequestSchema, { event })).catch((err) => {
309-
this.log.debug(`GrpcEventStream: Failed to write event ${event.eventUlid} during flush: ${err}`)
316+
this.log("debug", () => `Failed to write event ${event.eventUlid} during flush: ${err}`)
310317
})
311318
}
312319
}

core/src/logger/log-entry.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ export interface LogEntry<C extends BaseContext = LogContext>
178178
skipEmit?: boolean
179179
}
180180

181-
interface LogParams
181+
export interface LogParams
182182
extends Pick<LogEntry, "metadata" | "msg" | "rawMsg" | "symbol" | "data" | "dataFormat" | "error" | "skipEmit">,
183183
Pick<LogContext, "origin">,
184184
Pick<LogConfig<LogContext>, "showDuration"> {}

0 commit comments

Comments
 (0)