Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove watch cache from pepr #643

Merged
merged 7 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions journey/pepr-deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ export function peprDeploy() {
}

it("npx pepr monitor should display validation results to console", async () => {
await testValidate();
cmwylie19 marked this conversation as resolved.
Show resolved Hide resolved

const cmd = ['pepr', 'monitor', 'static-test']

const proc = spawn('npx', cmd, { shell: true })
Expand Down
1 change: 0 additions & 1 deletion src/cli/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ export default function (program: RootCmd) {
const filteredFailures = failures
.filter((r: ResponseItem) => !r.allowed)
.map((r: ResponseItem) => r.status.message);
// console.log(`${name} (${uid}) | VALIDATE | ${allow ? "ALLOW" : "DENY"}`);
if (filteredFailures.length > 0) {
console.log(`\n❌ VALIDATE ${name} (${uid})`);
console.debug(`\u001b[1;31m${filteredFailures}\u001b[0m`);
Expand Down
6 changes: 4 additions & 2 deletions src/lib/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ export class PeprModule {
this.#controller = new Controller(config, capabilities, opts.beforeHook, opts.afterHook, () => {
// Wait for the controller to be ready before setting up watches
if (isWatchMode() || isDevMode()) {
setupWatch(config.uuid, capabilities).catch(e => {
try {
setupWatch(capabilities);
} catch (e) {
Log.error(e, "Error setting up watch");
process.exit(1);
});
}
}
});

Expand Down
66 changes: 10 additions & 56 deletions src/lib/watch-processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@

import { beforeEach, describe, expect, it, jest } from "@jest/globals";
import { GenericClass, K8s, KubernetesObject, kind } from "kubernetes-fluent-client";

import { K8sInit, WatchPhase } from "kubernetes-fluent-client/dist/fluent/types";
import { WatchCfg, WatchEvent, Watcher } from "kubernetes-fluent-client/dist/fluent/watch";
import { Capability } from "./capability";
import { PeprStore } from "./k8s";
import { setupWatch } from "./watch-processor";

const uuid = "static-test";

type onCallback = (eventName: string | symbol, listener: (msg: string) => void) => void;

// Mock the dependencies
Expand Down Expand Up @@ -87,25 +83,24 @@ describe("WatchProcessor", () => {
],
} as unknown as Capability);

await setupWatch(uuid, capabilities);
setupWatch(capabilities);

expect(mockK8s).toHaveBeenCalledTimes(3);
expect(mockK8s).toHaveBeenNthCalledWith(1, PeprStore);
expect(mockK8s).toHaveBeenNthCalledWith(2, "someModel", {});
expect(mockK8s).toHaveBeenNthCalledWith(3, "someModel", { name: "bleh" });
expect(mockK8s).toHaveBeenCalledTimes(2);
expect(mockK8s).toHaveBeenNthCalledWith(1, "someModel", {});
expect(mockK8s).toHaveBeenNthCalledWith(2, "someModel", { name: "bleh" });

expect(mockWatch).toHaveBeenCalledTimes(2);
expect(mockWatch).toHaveBeenCalledWith(expect.any(Function), expect.objectContaining(watchCfg));
});

it("should not setup watches if capabilities array is empty", async () => {
await setupWatch(uuid, []);
await setupWatch([]);
expect(mockWatch).toHaveBeenCalledTimes(0);
});

it("should not setup watches if no bindings are present", async () => {
const capabilities = [{ bindings: [] }, { bindings: [] }] as unknown as Capability[];
await setupWatch(uuid, capabilities);
await setupWatch(capabilities);
expect(mockWatch).toHaveBeenCalledTimes(0);
});

Expand All @@ -116,52 +111,11 @@ describe("WatchProcessor", () => {

mockStart.mockRejectedValue(new Error("err") as never);

await setupWatch(uuid, capabilities);
await setupWatch(capabilities);

expect(exitSpy).toHaveBeenCalledWith(1);
});

it("should load the store before setting up watches", async () => {
await setupWatch(uuid, capabilities);
expect(mockGet).toHaveBeenCalledTimes(1);
});

it("should set an interval to update the store every 10 seconds", async () => {
const setIntervalSpy = jest.spyOn(global, "setInterval");

await setupWatch(uuid, capabilities);

expect(setIntervalSpy).toHaveBeenCalledTimes(1);
expect(setIntervalSpy).toHaveBeenCalledWith(expect.any(Function), 10 * 1000);
});

it("should update the store if there are changes every 10 seconds", async () => {
const setIntervalSpy = jest.spyOn(global, "setInterval");

mockEvents.mockImplementation((eventName: string | symbol, listener: (msg: string) => void) => {
if (eventName === WatchEvent.RESOURCE_VERSION) {
expect(listener).toBeInstanceOf(Function);
listener("45");
}
});

await setupWatch(uuid, capabilities);

const flushCache = setIntervalSpy.mock.calls[0][0] as () => void;
flushCache();

expect(mockApply).toHaveBeenCalledTimes(1);
expect(mockApply).toHaveBeenNthCalledWith(1, {
data: {
"42dae115ed-8aa1f3": "756",
"8aa1fde099-32a12": "750",
"57332a1dee-73560": "45",
"57332a1dee-57332": "45",
},
metadata: { name: "pepr-static-test-watch", namespace: "pepr-system" },
});
});

it("should watch for the resource_update event", async () => {
mockEvents.mockImplementation((eventName: string | symbol, listener: (msg: string) => void) => {
if (eventName === WatchEvent.RESOURCE_VERSION) {
Expand All @@ -170,7 +124,7 @@ describe("WatchProcessor", () => {
}
});

await setupWatch(uuid, capabilities);
setupWatch(capabilities);
});

it("should watch for the give_up event", async () => {
Expand All @@ -186,7 +140,7 @@ describe("WatchProcessor", () => {
}
});

await setupWatch(uuid, capabilities);
setupWatch(capabilities);
});

it("should setup watches with correct phases for different events", async () => {
Expand All @@ -205,7 +159,7 @@ describe("WatchProcessor", () => {
},
] as unknown as Capability[];

await setupWatch(uuid, capabilities);
setupWatch(capabilities);

type mockArg = [(payload: kind.Pod, phase: WatchPhase) => void, WatchCfg];

Expand Down
56 changes: 1 addition & 55 deletions src/lib/watch-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,15 @@ import { K8s, WatchCfg, WatchEvent } from "kubernetes-fluent-client";
import { WatchPhase } from "kubernetes-fluent-client/dist/fluent/types";
import { Queue } from "./queue";
import { Capability } from "./capability";
import { PeprStore } from "./k8s";
import Log from "./logger";
import { Binding, Event } from "./types";
import { Watcher } from "kubernetes-fluent-client/dist/fluent/watch";
import { GenericClass } from "kubernetes-fluent-client";
import { filterMatcher } from "./helpers";

// Track if the store has been updated
let storeUpdates = false;

const store: Record<string, string> = {};

export async function setupStore(uuid: string) {
const name = `pepr-${uuid}-watch`;
const namespace = "pepr-system";

try {
// Try to read the watch store if it exists
const k8sStore = await K8s(PeprStore).InNamespace(namespace).Get(name);

// Iterate over the store and add the values to the local store
Object.entries(k8sStore.data).forEach(([key, value]) => {
store[key] = value;
});
} catch (e) {
// A store not existing is expected behavior on the first run
Log.debug(e, "Watch store does not exist yet");
}

// Update the store every 10 seconds if there are changes
setInterval(() => {
if (storeUpdates) {
K8s(PeprStore)
.Apply({
metadata: {
name,
namespace,
},
data: store,
})
// Reset the store updates flag
.then(() => (storeUpdates = false))
// Log the error if the store update fails, but don't reset the store updates flag
.catch(e => {
Log.error(e, "Error updating watch store");
});
}
}, 10 * 1000);
}

export async function setupWatch(uuid: string, capabilities: Capability[]) {
await setupStore(uuid);

export function setupWatch(capabilities: Capability[]) {
capabilities.map(capability =>
capability.bindings
.filter(binding => binding.isWatch)
Expand Down Expand Up @@ -134,16 +90,6 @@ async function runBinding(binding: Binding, capabilityNamespaces: string[]) {
const cacheSuffix = createHash("sha224").update(binding.watchCallback!.toString()).digest("hex").substring(0, 5);
const cacheID = [watcher.getCacheID(), cacheSuffix].join("-");

// Track the resource version in the local store
watcher.events.on(WatchEvent.RESOURCE_VERSION, version => {
Log.debug(`Received watch cache: ${cacheID}:${version}`);
if (store[cacheID] !== version) {
Log.debug(`Updating watch cache: ${cacheID}: ${store[cacheID]} => ${version}`);
store[cacheID] = version;
storeUpdates = true;
}
});

// If failure continues, log and exit
watcher.events.on(WatchEvent.GIVE_UP, err => {
Log.error(err, "Watch failed after 5 attempts, giving up");
Expand Down
Loading