Skip to content

Commit

Permalink
chore: remove watch cache from pepr (#643)
Browse files Browse the repository at this point in the history
## Description

Watch Cache is causing more problems that it is fixing. Kubernetes
Fluent Client is a way to track resource versions. We are considering
removing the watch cache from Pepr

## Related Issue

Fixes #642 
Fixes #640 (Because the security scans were stuck running for 20 mins
and I needed to restart them)
<!-- or -->
Relates to #

## Type of change

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [x] Other (security config, docs update, etc)

## Checklist before merging

- [ ] Test, docs, adr added or updated as needed
- [ ] [Contributor Guide
Steps](https://github.com/defenseunicorns/pepr/blob/main/CONTRIBUTING.md#submitting-a-pull-request)
followed

---------

Signed-off-by: Case Wylie <cmwylie19@defenseunicorns.com>
  • Loading branch information
cmwylie19 committed Mar 13, 2024
1 parent 1564f61 commit 667de73
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 115 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
registry-url: "https://registry.npmjs.org"
cache: "npm"

- name: "Zarf Agent: Login to GHCR"
- name: "Pepr Controller: Login to GHCR"
uses: docker/login-action@343f7c4344506bcbf9b4de18042ae17996df046d # v3.0.0
with:
registry: ghcr.io
Expand Down
2 changes: 2 additions & 0 deletions journey/pepr-deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ export function peprDeploy() {
}

it("npx pepr monitor should display validation results to console", async () => {
await testValidate();

const cmd = ['pepr', 'monitor', 'static-test']

const proc = spawn('npx', cmd, { shell: true })
Expand Down
1 change: 0 additions & 1 deletion src/cli/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ export default function (program: RootCmd) {
const filteredFailures = failures
.filter((r: ResponseItem) => !r.allowed)
.map((r: ResponseItem) => r.status.message);
// console.log(`${name} (${uid}) | VALIDATE | ${allow ? "ALLOW" : "DENY"}`);
if (filteredFailures.length > 0) {
console.log(`\n❌ VALIDATE ${name} (${uid})`);
console.debug(`\u001b[1;31m${filteredFailures}\u001b[0m`);
Expand Down
6 changes: 4 additions & 2 deletions src/lib/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ export class PeprModule {
this.#controller = new Controller(config, capabilities, opts.beforeHook, opts.afterHook, () => {
// Wait for the controller to be ready before setting up watches
if (isWatchMode() || isDevMode()) {
setupWatch(config.uuid, capabilities).catch(e => {
try {
setupWatch(capabilities);
} catch (e) {
Log.error(e, "Error setting up watch");
process.exit(1);
});
}
}
});

Expand Down
66 changes: 10 additions & 56 deletions src/lib/watch-processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@

import { beforeEach, describe, expect, it, jest } from "@jest/globals";
import { GenericClass, K8s, KubernetesObject, kind } from "kubernetes-fluent-client";

import { K8sInit, WatchPhase } from "kubernetes-fluent-client/dist/fluent/types";
import { WatchCfg, WatchEvent, Watcher } from "kubernetes-fluent-client/dist/fluent/watch";
import { Capability } from "./capability";
import { PeprStore } from "./k8s";
import { setupWatch } from "./watch-processor";

const uuid = "static-test";

type onCallback = (eventName: string | symbol, listener: (msg: string) => void) => void;

// Mock the dependencies
Expand Down Expand Up @@ -87,25 +83,24 @@ describe("WatchProcessor", () => {
],
} as unknown as Capability);

await setupWatch(uuid, capabilities);
setupWatch(capabilities);

expect(mockK8s).toHaveBeenCalledTimes(3);
expect(mockK8s).toHaveBeenNthCalledWith(1, PeprStore);
expect(mockK8s).toHaveBeenNthCalledWith(2, "someModel", {});
expect(mockK8s).toHaveBeenNthCalledWith(3, "someModel", { name: "bleh" });
expect(mockK8s).toHaveBeenCalledTimes(2);
expect(mockK8s).toHaveBeenNthCalledWith(1, "someModel", {});
expect(mockK8s).toHaveBeenNthCalledWith(2, "someModel", { name: "bleh" });

expect(mockWatch).toHaveBeenCalledTimes(2);
expect(mockWatch).toHaveBeenCalledWith(expect.any(Function), expect.objectContaining(watchCfg));
});

it("should not setup watches if capabilities array is empty", async () => {
await setupWatch(uuid, []);
await setupWatch([]);
expect(mockWatch).toHaveBeenCalledTimes(0);
});

it("should not setup watches if no bindings are present", async () => {
const capabilities = [{ bindings: [] }, { bindings: [] }] as unknown as Capability[];
await setupWatch(uuid, capabilities);
await setupWatch(capabilities);
expect(mockWatch).toHaveBeenCalledTimes(0);
});

Expand All @@ -116,52 +111,11 @@ describe("WatchProcessor", () => {

mockStart.mockRejectedValue(new Error("err") as never);

await setupWatch(uuid, capabilities);
await setupWatch(capabilities);

expect(exitSpy).toHaveBeenCalledWith(1);
});

it("should load the store before setting up watches", async () => {
await setupWatch(uuid, capabilities);
expect(mockGet).toHaveBeenCalledTimes(1);
});

it("should set an interval to update the store every 10 seconds", async () => {
const setIntervalSpy = jest.spyOn(global, "setInterval");

await setupWatch(uuid, capabilities);

expect(setIntervalSpy).toHaveBeenCalledTimes(1);
expect(setIntervalSpy).toHaveBeenCalledWith(expect.any(Function), 10 * 1000);
});

it("should update the store if there are changes every 10 seconds", async () => {
const setIntervalSpy = jest.spyOn(global, "setInterval");

mockEvents.mockImplementation((eventName: string | symbol, listener: (msg: string) => void) => {
if (eventName === WatchEvent.RESOURCE_VERSION) {
expect(listener).toBeInstanceOf(Function);
listener("45");
}
});

await setupWatch(uuid, capabilities);

const flushCache = setIntervalSpy.mock.calls[0][0] as () => void;
flushCache();

expect(mockApply).toHaveBeenCalledTimes(1);
expect(mockApply).toHaveBeenNthCalledWith(1, {
data: {
"42dae115ed-8aa1f3": "756",
"8aa1fde099-32a12": "750",
"57332a1dee-73560": "45",
"57332a1dee-57332": "45",
},
metadata: { name: "pepr-static-test-watch", namespace: "pepr-system" },
});
});

it("should watch for the resource_update event", async () => {
mockEvents.mockImplementation((eventName: string | symbol, listener: (msg: string) => void) => {
if (eventName === WatchEvent.RESOURCE_VERSION) {
Expand All @@ -170,7 +124,7 @@ describe("WatchProcessor", () => {
}
});

await setupWatch(uuid, capabilities);
setupWatch(capabilities);
});

it("should watch for the give_up event", async () => {
Expand All @@ -186,7 +140,7 @@ describe("WatchProcessor", () => {
}
});

await setupWatch(uuid, capabilities);
setupWatch(capabilities);
});

it("should setup watches with correct phases for different events", async () => {
Expand All @@ -205,7 +159,7 @@ describe("WatchProcessor", () => {
},
] as unknown as Capability[];

await setupWatch(uuid, capabilities);
setupWatch(capabilities);

type mockArg = [(payload: kind.Pod, phase: WatchPhase) => void, WatchCfg];

Expand Down
56 changes: 1 addition & 55 deletions src/lib/watch-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,15 @@ import { K8s, WatchCfg, WatchEvent } from "kubernetes-fluent-client";
import { WatchPhase } from "kubernetes-fluent-client/dist/fluent/types";
import { Queue } from "./queue";
import { Capability } from "./capability";
import { PeprStore } from "./k8s";
import Log from "./logger";
import { Binding, Event } from "./types";
import { Watcher } from "kubernetes-fluent-client/dist/fluent/watch";
import { GenericClass } from "kubernetes-fluent-client";
import { filterMatcher } from "./helpers";

// Track if the store has been updated
let storeUpdates = false;

const store: Record<string, string> = {};

export async function setupStore(uuid: string) {
const name = `pepr-${uuid}-watch`;
const namespace = "pepr-system";

try {
// Try to read the watch store if it exists
const k8sStore = await K8s(PeprStore).InNamespace(namespace).Get(name);

// Iterate over the store and add the values to the local store
Object.entries(k8sStore.data).forEach(([key, value]) => {
store[key] = value;
});
} catch (e) {
// A store not existing is expected behavior on the first run
Log.debug(e, "Watch store does not exist yet");
}

// Update the store every 10 seconds if there are changes
setInterval(() => {
if (storeUpdates) {
K8s(PeprStore)
.Apply({
metadata: {
name,
namespace,
},
data: store,
})
// Reset the store updates flag
.then(() => (storeUpdates = false))
// Log the error if the store update fails, but don't reset the store updates flag
.catch(e => {
Log.error(e, "Error updating watch store");
});
}
}, 10 * 1000);
}

export async function setupWatch(uuid: string, capabilities: Capability[]) {
await setupStore(uuid);

export function setupWatch(capabilities: Capability[]) {
capabilities.map(capability =>
capability.bindings
.filter(binding => binding.isWatch)
Expand Down Expand Up @@ -134,16 +90,6 @@ async function runBinding(binding: Binding, capabilityNamespaces: string[]) {
const cacheSuffix = createHash("sha224").update(binding.watchCallback!.toString()).digest("hex").substring(0, 5);
const cacheID = [watcher.getCacheID(), cacheSuffix].join("-");

// Track the resource version in the local store
watcher.events.on(WatchEvent.RESOURCE_VERSION, version => {
Log.debug(`Received watch cache: ${cacheID}:${version}`);
if (store[cacheID] !== version) {
Log.debug(`Updating watch cache: ${cacheID}: ${store[cacheID]} => ${version}`);
store[cacheID] = version;
storeUpdates = true;
}
});

// If failure continues, log and exit
watcher.events.on(WatchEvent.GIVE_UP, err => {
Log.error(err, "Watch failed after 5 attempts, giving up");
Expand Down

0 comments on commit 667de73

Please sign in to comment.