Skip to content
This repository was archived by the owner on Jul 30, 2025. It is now read-only.

Commit d46f591

Browse files
committed
fix(plugins/plugin-kubectl): new kubectl event watching logic may drop events
Fixes #6451
1 parent 9a2bc74 commit d46f591

File tree

2 files changed

+65
-21
lines changed

2 files changed

+65
-21
lines changed

plugins/plugin-kubectl/src/controller/client/direct/watch.ts

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,13 @@ class DirectWatcher implements Abortable, Watcher {
3939
private jobs: (Abortable & FlowControllable)[] = []
4040

4141
/** We only seem to get these once */
42-
private columnDefinitions: MetaTable['columnDefinitions']
42+
private bodyColumnDefinitions: MetaTable['columnDefinitions']
43+
private footerColumnDefinitions: {
44+
lastSeenColumnIdx: number
45+
objectColumnIdx: number
46+
messageColumnIdx: number
47+
nameColumnIdx: number
48+
}
4349

4450
// eslint-disable-next-line no-useless-constructor
4551
public constructor(
@@ -104,20 +110,11 @@ class DirectWatcher implements Abortable, Watcher {
104110

105111
/** Format a MetaTable of events into a string[] */
106112
private formatFooter(table: MetaTable): string[] {
107-
const lastSeenColumnIdx = table.columnDefinitions.findIndex(_ => /Last Seen/i.test(_.name))
108-
const objectColumnIdx = table.columnDefinitions.findIndex(_ => /Object/i.test(_.name))
109-
const messageColumnIdx = table.columnDefinitions.findIndex(_ => /Message/i.test(_.name))
110-
const nameColumnIdx = table.columnDefinitions.findIndex(_ => /Name/i.test(_.name))
111-
112-
if (lastSeenColumnIdx < 0) {
113-
console.error('Unable to format event footer, due to missing Last Seen column', table)
114-
} else if (objectColumnIdx < 0) {
115-
console.error('Unable to format event footer, due to missing Object column', table)
116-
} else if (messageColumnIdx < 0) {
117-
console.error('Unable to format event footer, due to missing Message column', table)
118-
} else if (nameColumnIdx < 0) {
119-
console.error('Unable to format event footer, due to missing Name column', table)
113+
if (!this.footerColumnDefinitions) {
114+
console.error('Dropping footer update, due to missing column definitions')
120115
} else {
116+
const { lastSeenColumnIdx, objectColumnIdx, messageColumnIdx, nameColumnIdx } = this.footerColumnDefinitions
117+
121118
return this.filterFooterRows(table, nameColumnIdx).map(_ => {
122119
const lastSeen = _.cells[lastSeenColumnIdx]
123120
const involvedObjectName = _.cells[objectColumnIdx]
@@ -132,8 +129,44 @@ class DirectWatcher implements Abortable, Watcher {
132129
}
133130
}
134131

132+
/** We pre-process the columnDefinitions for the events, to pick out the column indices of interest. */
133+
private initFooterColumnDefinitions(columnDefinitions: MetaTable['columnDefinitions']) {
134+
const indices = columnDefinitions.reduce(
135+
(indices, _, idx) => {
136+
if (_.name === 'Last Seen') {
137+
indices.lastSeenColumnIdx = idx
138+
} else if (_.name === 'Object') {
139+
indices.objectColumnIdx = idx
140+
} else if (_.name === 'Message') {
141+
indices.messageColumnIdx = idx
142+
} else if (_.name === 'Name') {
143+
indices.nameColumnIdx = idx
144+
}
145+
return indices
146+
},
147+
{ lastSeenColumnIdx: -1, objectColumnIdx: -1, messageColumnIdx: -1, nameColumnIdx: -1 }
148+
)
149+
150+
const { lastSeenColumnIdx, objectColumnIdx, messageColumnIdx, nameColumnIdx } = indices
151+
if (lastSeenColumnIdx < 0) {
152+
console.error('Unable to process footer column definitions, due to missing Last Seen column', columnDefinitions)
153+
} else if (objectColumnIdx < 0) {
154+
console.error('Unable to process footer column definitions, due to missing Object column', columnDefinitions)
155+
} else if (messageColumnIdx < 0) {
156+
console.error('Unable to process footer column definitions, due to missing Message column', columnDefinitions)
157+
} else if (nameColumnIdx < 0) {
158+
console.error('Unable to process footer column definitions, due to missing Name column', columnDefinitions)
159+
} else {
160+
this.footerColumnDefinitions = indices
161+
}
162+
}
163+
135164
/** This will be called by the event streamer when it has new data */
136-
private onEventData(update: WatchUpdate) {
165+
private onEventData(update: Pick<WatchUpdate, 'object'>) {
166+
if (!this.footerColumnDefinitions && update.object.columnDefinitions) {
167+
this.initFooterColumnDefinitions(update.object.columnDefinitions)
168+
}
169+
137170
this.pusher.footer(this.formatFooter(update.object))
138171
}
139172

@@ -142,7 +175,7 @@ class DirectWatcher implements Abortable, Watcher {
142175
// first: we need to fetch the initial table (so that we have a resourceVersion)
143176
const events = (await fetchFile(this.args.REPL, this.formatEventUrl(), headersForTableRequest))[0] as MetaTable
144177
if (isMetaTable(events)) {
145-
this.pusher.footer(this.formatFooter(events))
178+
this.onEventData({ object: events })
146179

147180
// second: now we can start the streamer against that resourceVersion
148181
const watchUrl = this.formatEventUrl({ resourceVersion: events.metadata.resourceVersion })
@@ -162,6 +195,7 @@ class DirectWatcher implements Abortable, Watcher {
162195
}
163196
}
164197

198+
/** This is the stream management bits for the body */
165199
private mgmt(onInit: Arguments['execOptions']['onInit']) {
166200
return {
167201
onInit,
@@ -190,8 +224,8 @@ class DirectWatcher implements Abortable, Watcher {
190224

191225
/** This will be called whenever the streamer has data for us. */
192226
private onData(update: WatchUpdate) {
193-
if (!update.object.columnDefinitions && this.columnDefinitions) {
194-
update.object.columnDefinitions = this.columnDefinitions
227+
if (!update.object.columnDefinitions && this.bodyColumnDefinitions) {
228+
update.object.columnDefinitions = this.bodyColumnDefinitions
195229
}
196230

197231
if (!update.object.columnDefinitions) {
@@ -200,9 +234,9 @@ class DirectWatcher implements Abortable, Watcher {
200234
}
201235

202236
let sendHeaders = false
203-
if (!this.columnDefinitions) {
237+
if (!this.bodyColumnDefinitions) {
204238
sendHeaders = true
205-
this.columnDefinitions = update.object.columnDefinitions
239+
this.bodyColumnDefinitions = update.object.columnDefinitions
206240
}
207241

208242
setTimeout(async () => {
@@ -225,6 +259,12 @@ class DirectWatcher implements Abortable, Watcher {
225259
}
226260
}
227261

262+
/**
263+
* If possible, turn a table into a Table & Watchable. If the given
264+
* `table` does not have a `resourceVersion` attribute, this mapping
265+
* will not be possible.
266+
*
267+
*/
228268
export default async function makeWatchable(
229269
args: Arguments<KubeOptions>,
230270
kind: string | Promise<string>,

plugins/plugin-kubectl/src/lib/util/json.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,14 @@ export default function mkGenerator<T extends Streamable>(
5959
let obj: T
6060
try {
6161
obj = JSON.parse(bundle) as T
62-
onData(obj)
6362
} catch (err) {
6463
console.error('Error parsing bundle', bundle, err)
6564
}
65+
try {
66+
onData(obj)
67+
} catch (err) {
68+
console.error('Error processing bundle', obj, err)
69+
}
6670
bundle = ''
6771
}
6872
}

0 commit comments

Comments
 (0)