diff --git a/package-lock.json b/package-lock.json index 2b220688..47551b87 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,7 +12,7 @@ "@types/ramda": "0.29.9", "express": "4.18.2", "fast-json-patch": "3.1.1", - "kubernetes-fluent-client": "1.10.0", + "kubernetes-fluent-client": "2.0.1", "pino": "8.17.2", "pino-pretty": "10.3.1", "prom-client": "15.1.0", @@ -167,9 +167,9 @@ } }, "node_modules/@babel/core": { - "version": "7.23.6", - "resolved": "https://registry.npmjs.org/@babel/core/-/core-7.23.6.tgz", - "integrity": "sha512-FxpRyGjrMJXh7X3wGLGhNDCRiwpWEF74sKjTLDJSG5Kyvow3QZaG0Adbqzi9ZrVjTWpsX+2cxWXD71NMg93kdw==", + "version": "7.23.7", + "resolved": "https://registry.npmjs.org/@babel/core/-/core-7.23.7.tgz", + "integrity": "sha512-+UpDgowcmqe36d4NwqvKsyPMlOLNGMsfMmQ5WGCu+siCe3t3dfe9njrzGfdN4qq+bcNUt0+Vw6haRxBOycs4dw==", "dev": true, "dependencies": { "@ampproject/remapping": "^2.2.0", @@ -177,10 +177,10 @@ "@babel/generator": "^7.23.6", "@babel/helper-compilation-targets": "^7.23.6", "@babel/helper-module-transforms": "^7.23.3", - "@babel/helpers": "^7.23.6", + "@babel/helpers": "^7.23.7", "@babel/parser": "^7.23.6", "@babel/template": "^7.22.15", - "@babel/traverse": "^7.23.6", + "@babel/traverse": "^7.23.7", "@babel/types": "^7.23.6", "convert-source-map": "^2.0.0", "debug": "^4.1.0", @@ -371,13 +371,13 @@ } }, "node_modules/@babel/helpers": { - "version": "7.23.6", - "resolved": "https://registry.npmjs.org/@babel/helpers/-/helpers-7.23.6.tgz", - "integrity": "sha512-wCfsbN4nBidDRhpDhvcKlzHWCTlgJYUUdSJfzXb2NuBssDSIjc3xcb+znA7l+zYsFljAcGM0aFkN40cR3lXiGA==", + "version": "7.23.7", + "resolved": "https://registry.npmjs.org/@babel/helpers/-/helpers-7.23.7.tgz", + "integrity": "sha512-6AMnjCoC8wjqBzDHkuqpa7jAKwvMo4dC+lr/TFBz+ucfulO1XMpDnwWPGBNwClOKZ8h6xn5N81W/R5OrcKtCbQ==", "dev": true, "dependencies": { "@babel/template": "^7.22.15", - "@babel/traverse": "^7.23.6", + "@babel/traverse": "^7.23.7", "@babel/types": "^7.23.6" }, "engines": { @@ -673,9 +673,9 @@ } }, "node_modules/@babel/traverse": { - "version": "7.23.6", - "resolved": "https://registry.npmjs.org/@babel/traverse/-/traverse-7.23.6.tgz", - "integrity": "sha512-czastdK1e8YByZqezMPFiZ8ahwVMh/ESl9vPgvgdB9AmFMGP5jfpFax74AQgl5zj4XHzqeYAg2l8PuUeRS1MgQ==", + "version": "7.23.7", + "resolved": "https://registry.npmjs.org/@babel/traverse/-/traverse-7.23.7.tgz", + "integrity": "sha512-tY3mM8rH9jM0YHFGyfC0/xf+SB5eKUu7HPj7/k3fpi9dAlsMc5YbQvDi0Sh2QTPXqMhyaAtzAr807TIyfQrmyg==", "dev": true, "dependencies": { "@babel/code-frame": "^7.23.5", @@ -1876,9 +1876,9 @@ } }, "node_modules/@kubernetes/client-node": { - "version": "1.0.0-rc3", - "resolved": "https://registry.npmjs.org/@kubernetes/client-node/-/client-node-1.0.0-rc3.tgz", - "integrity": "sha512-bTYMBZXVrjfi98N5EZbrmPtcT9NY+TddunSEc25DcsRF1c5c93e5jT+zFwId19hG8e/ue5deKe7YDQiRYFpMlQ==", + "version": "1.0.0-rc4", + "resolved": "https://registry.npmjs.org/@kubernetes/client-node/-/client-node-1.0.0-rc4.tgz", + "integrity": "sha512-S7UMp/4jKxjrvO9dUHhx3AmiNTSnxtHR7k3+DI7cEuaOB7QaurtTjJJuAYf+Gi/yO6HpeyvB82uPHwfKyqivdg==", "dependencies": { "@types/js-yaml": "^4.0.1", "@types/node": "^20.3.1", @@ -1905,9 +1905,9 @@ } }, "node_modules/@kubernetes/client-node/node_modules/@types/node": { - "version": "20.10.5", - "resolved": "https://registry.npmjs.org/@types/node/-/node-20.10.5.tgz", - "integrity": "sha512-nNPsNE65wjMxEKI93yOP+NPGGBJz/PoN3kZsVLee0XMiJolxSekEVD8wRwBUBqkwc7UWop0edW50yrCQW4CyRw==", + "version": "20.10.6", + "resolved": "https://registry.npmjs.org/@types/node/-/node-20.10.6.tgz", + "integrity": "sha512-Vac8H+NlRNNlAmDfGUP7b5h/KA+AtWIzuXy0E6OyP8f1tCLYAtPvKRRDJjAPqhpCb0t6U2j7/xqAuLEebW2kiw==", "dependencies": { "undici-types": "~5.26.4" } @@ -2141,17 +2141,17 @@ "dev": true }, "node_modules/@types/node": { - "version": "18.19.3", - "resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.3.tgz", - "integrity": "sha512-k5fggr14DwAytoA/t8rPrIz++lXK7/DqckthCmoZOKNsEbJkId4Z//BqgApXBUGrGddrigYa1oqheo/7YmW4rg==", + "version": "18.19.4", + "resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.4.tgz", + "integrity": "sha512-xNzlUhzoHotIsnFoXmJB+yWmBvFZgKCI9TtPIEdYIMM1KWfwuY8zh7wvc1u1OAXlC7dlf6mZVx/s+Y5KfFz19A==", "dependencies": { "undici-types": "~5.26.4" } }, "node_modules/@types/node-fetch": { - "version": "2.6.9", - "resolved": "https://registry.npmjs.org/@types/node-fetch/-/node-fetch-2.6.9.tgz", - "integrity": "sha512-bQVlnMLFJ2d35DkPNjEPmd9ueO/rh5EiaZt2bhqiSarPjZIuIV6bPQVqcrEyvNo+AfTrRGVazle1tl597w3gfA==", + "version": "2.6.10", + "resolved": "https://registry.npmjs.org/@types/node-fetch/-/node-fetch-2.6.10.tgz", + "integrity": "sha512-PPpPK6F9ALFTn59Ka3BaL+qGuipRfxNE8qVgkp0bVixeiR2c2/L+IVOiBdu9JhhT22sWnQEp6YyHGI2b2+CMcA==", "dependencies": { "@types/node": "*", "form-data": "^4.0.0" @@ -2183,9 +2183,9 @@ } }, "node_modules/@types/qs": { - "version": "6.9.10", - "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.10.tgz", - "integrity": "sha512-3Gnx08Ns1sEoCrWssEgTSJs/rsT2vhGP+Ja9cnnk9k4ALxinORlQneLXFeFKOTJMOeZUFD1s7w+w2AphTpvzZw==", + "version": "6.9.11", + "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.9.11.tgz", + "integrity": "sha512-oGk0gmhnEJK4Yyk+oI7EfXsLayXatCWPHary1MtcmbAifkobT9cM9yutG/hZKIseOU0MqbIwQ/u2nn/Gb+ltuQ==", "dev": true }, "node_modules/@types/ramda": { @@ -2510,9 +2510,9 @@ } }, "node_modules/acorn": { - "version": "8.11.2", - "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.11.2.tgz", - "integrity": "sha512-nc0Axzp/0FILLEVsm4fNwLCwMttvhEI263QtVPQcbpfZZ3ts0hLsZGOpE6czNlid7CJ9MlyH8reXkpsf3YUY4w==", + "version": "8.11.3", + "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.11.3.tgz", + "integrity": "sha512-Y9rRfJG5jcKOE0CLisYbojUjIrIEE7AGMzA/Sm4BslANhbS+cDMpgBdcPT91oJ7OuJ9hYJBx59RjbhxVnrF8Xg==", "peer": true, "bin": { "acorn": "bin/acorn" @@ -3007,9 +3007,9 @@ } }, "node_modules/caniuse-lite": { - "version": "1.0.30001570", - "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001570.tgz", - "integrity": "sha512-+3e0ASu4sw1SWaoCtvPeyXp+5PsjigkSt8OXZbF9StH5pQWbxEjLAZE3n8Aup5udop1uRiKA7a4utUk/uoSpUw==", + "version": "1.0.30001572", + "resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001572.tgz", + "integrity": "sha512-1Pbh5FLmn5y4+QhNyJE9j3/7dK44dGB83/ZMjv/qJk86TvDbjk0LosiZo0i0WB0Vx607qMX9jYrn1VLHCkN4rw==", "dev": true, "funding": [ { @@ -3528,9 +3528,9 @@ "integrity": "sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow==" }, "node_modules/electron-to-chromium": { - "version": "1.4.615", - "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.4.615.tgz", - "integrity": "sha512-/bKPPcgZVUziECqDc+0HkT87+0zhaWSZHNXqF8FLd2lQcptpmUFwoCSWjCdOng9Gdq+afKArPdEg/0ZW461Eng==", + "version": "1.4.616", + "resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.4.616.tgz", + "integrity": "sha512-1n7zWYh8eS0L9Uy+GskE0lkBUNK83cXTVJI0pU3mGprFsbfSdAc15VTFbo+A+Bq4pwstmL30AVcEU3Fo463lNg==", "dev": true }, "node_modules/emittery": { @@ -5636,17 +5636,17 @@ } }, "node_modules/kubernetes-fluent-client": { - "version": "1.10.0", - "resolved": "https://registry.npmjs.org/kubernetes-fluent-client/-/kubernetes-fluent-client-1.10.0.tgz", - "integrity": "sha512-jkLCc10eKplU0VUEouIMgQYhpn1O3lcan7dX4TPwAT8g0anrH97XcL8FXOHerGYKizq3KDNjRnPkRpBZwlJFfw==", + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/kubernetes-fluent-client/-/kubernetes-fluent-client-2.0.1.tgz", + "integrity": "sha512-MbRr6RIFn8R4Z3I6LjkYYRD/sS7KAegKfdlR7VbW9IxTvm5PsNjh9TyTVZ0D/8RsVmGohiXUhzrTomZz62c8Jw==", "dependencies": { - "@kubernetes/client-node": "1.0.0-rc3", + "@kubernetes/client-node": "1.0.0-rc4", "byline": "5.0.0", "fast-json-patch": "3.1.1", "http-status-codes": "2.3.0", "node-fetch": "2.7.0", "quicktype-core": "23.0.80", - "type-fest": "4.8.3", + "type-fest": "4.9.0", "yargs": "17.7.2" }, "bin": { @@ -5657,9 +5657,9 @@ } }, "node_modules/kubernetes-fluent-client/node_modules/type-fest": { - "version": "4.8.3", - "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.8.3.tgz", - "integrity": "sha512-//BaTm14Q/gHBn09xlnKNqfI8t6bmdzx2DXYfPBNofN0WUybCEUDcbCWcTa0oF09lzLjZgPphXAsvRiMK0V6Bw==", + "version": "4.9.0", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.9.0.tgz", + "integrity": "sha512-KS/6lh/ynPGiHD/LnAobrEFq3Ad4pBzOlJ1wAnJx9N4EYoqFhMfLIBjUT2UEx4wg5ZE+cC1ob6DCSpppVo+rtg==", "engines": { "node": ">=16" }, @@ -6193,11 +6193,11 @@ } }, "node_modules/openid-client": { - "version": "5.6.1", - "resolved": "https://registry.npmjs.org/openid-client/-/openid-client-5.6.1.tgz", - "integrity": "sha512-PtrWsY+dXg6y8mtMPyL/namZSYVz8pjXz3yJiBNZsEdCnu9miHLB4ELVC85WvneMKo2Rg62Ay7NkuCpM0bgiLQ==", + "version": "5.6.2", + "resolved": "https://registry.npmjs.org/openid-client/-/openid-client-5.6.2.tgz", + "integrity": "sha512-TIVimoK/fAvpiISLcoGZyNJx2TOfd5AE6TXn58FFj6Y8qbU/jqky54Aws7sYKuCph1bLPWSRUa1r/Rd6K21bhg==", "dependencies": { - "jose": "^4.15.1", + "jose": "^4.15.4", "lru-cache": "^6.0.0", "object-hash": "^2.2.0", "oidc-token-hash": "^5.0.3" @@ -6916,9 +6916,9 @@ } }, "node_modules/readable-stream": { - "version": "4.5.1", - "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-4.5.1.tgz", - "integrity": "sha512-uQjbf34vmf/asGnOHQEw07Q4llgMACQZTWWa4MmICS0IKJoHbLwKCy71H3eR99Dw5iYejc6W+pqZZEeqRtUFAw==", + "version": "4.5.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-4.5.2.tgz", + "integrity": "sha512-yjavECdqeZ3GLXNgRXgeQEdz9fvDDkNKyHnbHRFtOr7/LcfgBcmct7t/ET+HaCTqfh06OzoAxrkN/IfjJBVe+g==", "dependencies": { "abort-controller": "^3.0.0", "buffer": "^6.0.3", @@ -8042,9 +8042,9 @@ } }, "node_modules/ws": { - "version": "8.15.1", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.15.1.tgz", - "integrity": "sha512-W5OZiCjXEmk0yZ66ZN82beM5Sz7l7coYxpRkzS+p9PP+ToQry8szKh+61eNktr7EA9DOwvFGhfC605jDHbP6QQ==", + "version": "8.16.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.16.0.tgz", + "integrity": "sha512-HS0c//TP7Ina87TfiPUz1rQzMhHrl/SG2guqRcTOIUYD2q8uhUdNHZYJUaQ8aTGPzCh+c6oawMKW35nFl1dxyQ==", "engines": { "node": ">=10.0.0" }, diff --git a/package.json b/package.json index 497a236d..a0f68074 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,7 @@ "@types/ramda": "0.29.9", "express": "4.18.2", "fast-json-patch": "3.1.1", - "kubernetes-fluent-client": "1.10.0", + "kubernetes-fluent-client": "2.0.1", "pino": "8.17.2", "pino-pretty": "10.3.1", "prom-client": "15.1.0", diff --git a/src/lib/controller/index.ts b/src/lib/controller/index.ts index 33c5991e..9b1914a9 100644 --- a/src/lib/controller/index.ts +++ b/src/lib/controller/index.ts @@ -45,12 +45,12 @@ export class Controller { this.#capabilities = capabilities; // Initialize the Pepr store for each capability - new PeprControllerStore(config, capabilities, `pepr-${config.uuid}-store`, () => { + new PeprControllerStore(capabilities, `pepr-${config.uuid}-store`, () => { this.#bindEndpoints(); onReady && onReady(); Log.info("✅ Controller startup complete"); // Initialize the schedule store for each capability - new PeprControllerStore(config, capabilities, `pepr-${config.uuid}-schedule`, () => { + new PeprControllerStore(capabilities, `pepr-${config.uuid}-schedule`, () => { Log.info("✅ Scheduling processed"); }); }); diff --git a/src/lib/controller/store.ts b/src/lib/controller/store.ts index 8a096dbb..92849b1d 100644 --- a/src/lib/controller/store.ts +++ b/src/lib/controller/store.ts @@ -8,7 +8,6 @@ import { startsWith } from "ramda"; import { Capability } from "../capability"; import { PeprStore } from "../k8s"; import Log from "../logger"; -import { ModuleConfig } from "../module"; import { DataOp, DataSender, DataStore, Storage } from "../storage"; const namespace = "pepr-system"; @@ -20,7 +19,7 @@ export class PeprControllerStore { #sendDebounce: NodeJS.Timeout | undefined; #onReady?: () => void; - constructor(config: ModuleConfig, capabilities: Capability[], name: string, onReady?: () => void) { + constructor(capabilities: Capability[], name: string, onReady?: () => void) { this.#onReady = onReady; // Setup Pepr State bindings @@ -71,7 +70,8 @@ export class PeprControllerStore { } #setupWatch = () => { - void K8s(PeprStore, { name: this.#name, namespace }).Watch(this.#receive); + const watcher = K8s(PeprStore, { name: this.#name, namespace }).Watch(this.#receive); + watcher.start().catch(e => Log.error(e, "Error starting Pepr store watch")); }; #receive = (store: PeprStore) => { diff --git a/src/lib/module.ts b/src/lib/module.ts index 33aa7abe..713aa8fb 100644 --- a/src/lib/module.ts +++ b/src/lib/module.ts @@ -9,6 +9,7 @@ import { ValidateError } from "./errors"; import { AdmissionRequest, MutateResponse, ValidateResponse, WebhookIgnore } from "./k8s"; import { CapabilityExport } from "./types"; import { setupWatch } from "./watch-processor"; +import { Log } from "../lib"; /** Global configuration for the Pepr runtime. */ export type ModuleConfig = { @@ -102,7 +103,10 @@ export class PeprModule { this.#controller = new Controller(config, capabilities, opts.beforeHook, opts.afterHook, () => { // Wait for the controller to be ready before setting up watches if (isWatchMode() || isDevMode()) { - setupWatch(capabilities); + setupWatch(config.uuid, capabilities).catch(e => { + Log.error(e, "Error setting up watch"); + process.exit(1); + }); } }); diff --git a/src/lib/watch-processor.test.ts b/src/lib/watch-processor.test.ts index a1c4b1d7..5ae45a0a 100644 --- a/src/lib/watch-processor.test.ts +++ b/src/lib/watch-processor.test.ts @@ -1,71 +1,195 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: 2023-Present The Pepr Authors -import { describe, expect, it, jest } from "@jest/globals"; -import { K8s, KubernetesObject, kind } from "kubernetes-fluent-client"; +import { beforeEach, describe, expect, it, jest } from "@jest/globals"; +import { GenericClass, K8s, KubernetesObject, kind } from "kubernetes-fluent-client"; import { K8sInit, WatchPhase } from "kubernetes-fluent-client/dist/fluent/types"; -import { WatchCfg } from "kubernetes-fluent-client/dist/fluent/watch"; +import { WatchCfg, WatchEvent, Watcher } from "kubernetes-fluent-client/dist/fluent/watch"; import { Capability } from "./capability"; +import { PeprStore } from "./k8s"; import { setupWatch } from "./watch-processor"; +const uuid = "static-test"; + +type onCallback = (eventName: string | symbol, listener: (msg: string) => void) => void; + // Mock the dependencies jest.mock("kubernetes-fluent-client"); describe("WatchProcessor", () => { - it("should setup watches for all bindings with isWatch=true", () => { - const mockK8s = jest.mocked(K8s); - const mockWatch = jest.fn(); - mockK8s.mockImplementation(() => { - return { Watch: mockWatch } as unknown as K8sInit; + const mockStart = jest.fn(); + const mockK8s = jest.mocked(K8s); + const mockApply = jest.fn(); + const mockGet = jest.fn(); + const mockWatch = jest.fn(); + const mockEvents = jest.fn() as jest.MockedFunction; + + const capabilities = [ + { + bindings: [ + { + isWatch: true, + model: "someModel", + filters: {}, + event: "Create", + watchCallback: () => { + console.log("words"); + }, + }, + ], + }, + ] as unknown as Capability[]; + + beforeEach(() => { + jest.resetAllMocks(); + jest.useFakeTimers(); + + mockK8s.mockImplementation(() => { + return { + Apply: mockApply, + InNamespace: jest.fn().mockReturnThis(), + Watch: mockWatch, + Get: mockGet, + } as unknown as K8sInit; + }); + + mockWatch.mockImplementation(() => { + return { + start: mockStart, + getCacheID: jest.fn().mockReturnValue("57332a1dee"), + events: { + on: mockEvents, + }, + } as unknown as Watcher; }); + mockGet.mockImplementation(() => ({ + data: { + "42dae115ed-8aa1f3": "756", + "8aa1fde099-32a12": "750", + }, + })); + + mockApply.mockImplementation(() => Promise.resolve()); + }); + + it("should setup watches for all bindings with isWatch=true", async () => { const watchCfg: WatchCfg = { - retryMax: 3, + retryMax: 5, retryDelaySec: 5, }; - const capabilities = [ - { - bindings: [ - { isWatch: true, model: "someModel", filters: { name: "bleh" }, event: "Create" }, - { isWatch: false, model: "someModel", filters: {}, event: "Create" }, - ], - }, - { - bindings: [{ isWatch: true, model: "someModel", filters: {}, event: "Create" }], - }, - ] as unknown as Capability[]; + capabilities.push({ + bindings: [ + { isWatch: true, model: "someModel", filters: { name: "bleh" }, event: "Create", watchCallback: jest.fn() }, + { isWatch: false, model: "someModel", filters: {}, event: "Create", watchCallback: jest.fn() }, + ], + } as unknown as Capability); - setupWatch(capabilities); + await setupWatch(uuid, capabilities); - expect(mockK8s).toHaveBeenCalledTimes(2); - expect(mockK8s).toHaveBeenNthCalledWith(1, "someModel", { name: "bleh" }); + expect(mockK8s).toHaveBeenCalledTimes(3); + expect(mockK8s).toHaveBeenNthCalledWith(1, PeprStore); expect(mockK8s).toHaveBeenNthCalledWith(2, "someModel", {}); + expect(mockK8s).toHaveBeenNthCalledWith(3, "someModel", { name: "bleh" }); expect(mockWatch).toHaveBeenCalledTimes(2); expect(mockWatch).toHaveBeenCalledWith(expect.any(Function), expect.objectContaining(watchCfg)); }); - it("should not setup watches if capabilities array is empty", () => { - const mockWatch = jest.fn(); - setupWatch([]); + it("should not setup watches if capabilities array is empty", async () => { + await setupWatch(uuid, []); expect(mockWatch).toHaveBeenCalledTimes(0); }); - it("should not setup watches if no bindings are present", () => { - const mockWatch = jest.fn(); + it("should not setup watches if no bindings are present", async () => { const capabilities = [{ bindings: [] }, { bindings: [] }] as unknown as Capability[]; - setupWatch(capabilities); + await setupWatch(uuid, capabilities); expect(mockWatch).toHaveBeenCalledTimes(0); }); - it("should setup watches with correct phases for different events", () => { - const mockWatch = jest.fn(); - jest.mocked(K8s).mockImplementation(() => { - return { Watch: mockWatch } as unknown as K8sInit; + it("should exit if the watch fails to start", async () => { + const exitSpy = jest.spyOn(process, "exit").mockImplementation(() => { + return undefined as never; }); + mockStart.mockRejectedValue(new Error("err") as never); + + await setupWatch(uuid, capabilities); + + expect(exitSpy).toHaveBeenCalledWith(1); + }); + + it("should load the store before setting up watches", async () => { + await setupWatch(uuid, capabilities); + expect(mockGet).toHaveBeenCalledTimes(1); + }); + + it("should set an interval to update the store every 10 seconds", async () => { + const setIntervalSpy = jest.spyOn(global, "setInterval"); + + await setupWatch(uuid, capabilities); + + expect(setIntervalSpy).toHaveBeenCalledTimes(1); + expect(setIntervalSpy).toHaveBeenCalledWith(expect.any(Function), 10 * 1000); + }); + + it("should update the store if there are changes every 10 seconds", async () => { + const setIntervalSpy = jest.spyOn(global, "setInterval"); + + mockEvents.mockImplementation((eventName: string | symbol, listener: (msg: string) => void) => { + if (eventName === WatchEvent.RESOURCE_VERSION) { + expect(listener).toBeInstanceOf(Function); + listener("45"); + } + }); + + await setupWatch(uuid, capabilities); + + const flushCache = setIntervalSpy.mock.calls[0][0] as () => void; + flushCache(); + + expect(mockApply).toHaveBeenCalledTimes(1); + expect(mockApply).toHaveBeenNthCalledWith(1, { + data: { + "42dae115ed-8aa1f3": "756", + "8aa1fde099-32a12": "750", + "57332a1dee-73560": "45", + "57332a1dee-57332": "45", + }, + metadata: { name: "pepr-static-test-watch", namespace: "pepr-system" }, + }); + }); + + it("should watch for the resource_update event", async () => { + mockEvents.mockImplementation((eventName: string | symbol, listener: (msg: string) => void) => { + if (eventName === WatchEvent.RESOURCE_VERSION) { + expect(listener).toBeInstanceOf(Function); + listener("45"); + } + }); + + await setupWatch(uuid, capabilities); + }); + + it("should watch for the give_up event", async () => { + const exitSpy = jest.spyOn(process, "exit").mockImplementation(() => { + return undefined as never; + }); + + mockEvents.mockImplementation((eventName: string | symbol, listener: (msg: string) => void) => { + if (eventName === WatchEvent.GIVE_UP) { + expect(listener).toBeInstanceOf(Function); + listener("err"); + expect(exitSpy).toHaveBeenCalledWith(1); + } + }); + + await setupWatch(uuid, capabilities); + }); + + it("should setup watches with correct phases for different events", async () => { const watchCallbackCreate = jest.fn(); const watchCallbackUpdate = jest.fn(); const watchCallbackDelete = jest.fn(); @@ -81,7 +205,7 @@ describe("WatchProcessor", () => { }, ] as unknown as Capability[]; - setupWatch(capabilities); + await setupWatch(uuid, capabilities); type mockArg = [(payload: kind.Pod, phase: WatchPhase) => void, WatchCfg]; @@ -89,7 +213,7 @@ describe("WatchProcessor", () => { const secondCall = mockWatch.mock.calls[1] as unknown as mockArg; const thirdCall = mockWatch.mock.calls[2] as unknown as mockArg; - expect(firstCall[1].retryMax).toEqual(3); + expect(firstCall[1].retryMax).toEqual(5); expect(firstCall[1].retryDelaySec).toEqual(5); expect(firstCall[0]).toBeInstanceOf(Function); diff --git a/src/lib/watch-processor.ts b/src/lib/watch-processor.ts index f4c65c6b..754d899e 100644 --- a/src/lib/watch-processor.ts +++ b/src/lib/watch-processor.ts @@ -1,15 +1,61 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: 2023-Present The Pepr Authors -import { K8s } from "kubernetes-fluent-client"; +import { createHash } from "crypto"; +import { K8s, WatchCfg, WatchEvent } from "kubernetes-fluent-client"; import { WatchPhase } from "kubernetes-fluent-client/dist/fluent/types"; -import { WatchCfg } from "kubernetes-fluent-client/dist/fluent/watch"; import { Capability } from "./capability"; +import { PeprStore } from "./k8s"; import Log from "./logger"; import { Binding, Event } from "./types"; -export function setupWatch(capabilities: Capability[]) { +// Track if the store has been updated +let storeUpdates = false; + +const store: Record = {}; + +export async function setupStore(uuid: string) { + const name = `pepr-${uuid}-watch`; + const namespace = "pepr-system"; + + try { + // Try to read the watch store if it exists + const k8sStore = await K8s(PeprStore).InNamespace(namespace).Get(name); + + // Iterate over the store and add the values to the local store + Object.entries(k8sStore.data).forEach(([key, value]) => { + store[key] = value; + }); + } catch (e) { + // A store not existing is expected behavior on the first run + Log.debug(e, "Watch store does not exist yet"); + } + + // Update the store every 10 seconds if there are changes + setInterval(() => { + if (storeUpdates) { + K8s(PeprStore) + .Apply({ + metadata: { + name, + namespace, + }, + data: store, + }) + // Reset the store updates flag + .then(() => (storeUpdates = false)) + // Log the error if the store update fails, but don't reset the store updates flag + .catch(e => { + Log.error(e, "Error updating watch store"); + }); + } + }, 10 * 1000); +} + +export async function setupWatch(uuid: string, capabilities: Capability[]) { + await setupStore(uuid); + capabilities .flatMap(c => c.bindings) .filter(binding => binding.isWatch) @@ -30,19 +76,12 @@ async function runBinding(binding: Binding) { const phaseMatch: WatchPhase[] = eventToPhaseMap[binding.event] || eventToPhaseMap[Event.Any]; const watchCfg: WatchCfg = { - retryMax: 3, + retryMax: 5, retryDelaySec: 5, - retryFail(e) { - // If failure continues, log and exit - Log.error(e, "Watch failed after 3 attempts, giving up"); - process.exit(1); - }, - // pino binding explodes unless we wrap it - logFn: (obj: unknown, msg?: string, ...args: unknown[]) => Log.debug(obj, msg, ...args), }; // Watch the resource - await K8s(binding.model, binding.filters).Watch(async (obj, type) => { + const watcher = K8s(binding.model, binding.filters).Watch(async (obj, type) => { Log.debug(obj, `Watch event ${type} received`); // If the type matches the phase, call the watch callback @@ -56,4 +95,38 @@ async function runBinding(binding: Binding) { } } }, watchCfg); + + // Create a unique cache ID for this watch binding in case multiple bindings are watching the same resource + const cacheSuffix = createHash("sha224").update(binding.watchCallback!.toString()).digest("hex").substring(0, 5); + const cacheID = [watcher.getCacheID(), cacheSuffix].join("-"); + + // Track the resource version in the local store + watcher.events.on(WatchEvent.RESOURCE_VERSION, version => { + Log.debug(`Received watch cache: ${cacheID}:${version}`); + if (store[cacheID] !== version) { + Log.debug(`Updating watch cache: ${cacheID}: ${store[cacheID]} => ${version}`); + store[cacheID] = version; + storeUpdates = true; + } + }); + + // If failure continues, log and exit + watcher.events.on(WatchEvent.GIVE_UP, err => { + Log.error(err, "Watch failed after 5 attempts, giving up"); + process.exit(1); + }); + + // Start the watch + try { + const resourceVersion = store[cacheID]; + if (resourceVersion) { + Log.debug(`Starting watch ${binding.model.name} from version ${resourceVersion}`); + watcher.resourceVersion = resourceVersion; + } + + await watcher.start(); + } catch (err) { + Log.error(err, "Error starting watch"); + process.exit(1); + } }