Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(k6): add support for k8s loading/unloading of Seldon CRs #5563

Merged
merged 6 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/k6/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,5 @@ create-secret:

xk6-install:
# Install xk6
go install github.com/grafana/xk6/cmd/xk6@latest
sakoush marked this conversation as resolved.
Show resolved Hide resolved
go install go.k6.io/xk6/cmd/xk6@latest
xk6 build --with github.com/grafana/xk6-kubernetes
104 changes: 104 additions & 0 deletions tests/k6/components/k8s.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { Kubernetes } from "k6/x/kubernetes";
import { getConfig } from '../components/settings.js'
import {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the future we might want also to check the status via kube. not a blocker for this PR though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, was already planning for that. The xk6-kubernetes extension is a bit wierd in that it doesn't have built-in functionality for waiting on a given condition/status. Also, it throws exceptions for any unexpected conditions, so I'll have to code that a bit defensively (via repeated resource gets).

awaitStatus,
awaitPipelineStatus,
awaitExperimentStart,
awaitExperimentStop
} from '../components/scheduler.js';
import { seldonObjectType } from '../components/seldon.js'

const kubeclient = new Kubernetes();
const namespace = getConfig().namespace;
var schedulerClient = null;

export function connectScheduler(schedulerCl) {
schedulerClient = schedulerCl
}

export function disconnectScheduler() {
schedulerClient = null
}

function seldonObjExists(kind, name, ns) {
// This is ugly, but xk6-kubernetes kubeclient.get(...) throws an exception if the
// underlying k8s CR doesn't exist.

// The alternative here would be to list all objects of the given kind from the namespace
// and see if the one with the specified name exists among them. However, that would end
// up being considerably slower, and we don't want to do it on every single
// model/pipeline/experiment load or unload.
try {
kubeclient.get(kind.description, name, ns)
return true
} catch(error) {
return false
}
}

export function loadModel(modelName, data, awaitReady=true) {
// TODO: Update existing model with new CR definition.
// At the moment, if an object with the same name exists, it will not be
// re-loaded with different settings. This is because we get a k8s apply
// conflict caused by a FieldManager being set on `.spec.memory`
if(!seldonObjExists(seldonObjectType.MODEL, modelName, namespace)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have to check if the model already exists?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a model already exists here we get an exception. The exception likely happens only when the loaded model has the same CR as an existing one (it is an apply after all).

kubeclient.apply(data)
let created = kubeclient.get(seldonObjectType.MODEL.description, modelName, namespace)
if ('uid' in created.metadata) {
if (awaitReady && schedulerClient != null) {
awaitStatus(modelName, "ModelAvailable")
}
}
}
}

export function unloadModel(modelName, awaitReady=true) {
if(seldonObjExists(seldonObjectType.MODEL, modelName, namespace)) {
kubeclient.delete(seldonObjectType.MODEL.description, modelName, namespace)
if (awaitReady && schedulerClient != null) {
awaitStatus(modelName, "ModelTerminated")
}
}
}

export function loadPipeline(pipelineName, data, awaitReady=true) {
if(!seldonObjExists(seldonObjectType.PIPELINE, pipelineName, namespace)) {
kubeclient.apply(data)
let created = kubeclient.get(seldonObjectType.PIPELINE.description, pipelineName, namespace)
if ('uid' in created.metadata) {
if (awaitReady && schedulerClient != null) {
awaitStatus(pipelineName, "PipelineReady")
}
}
}
}

export function unloadPipeline(pipelineName, awaitReady = true) {
if(seldonObjExists(seldonObjectType.PIPELINE, pipelineName, namespace)) {
kubeclient.delete(seldonObjectType.PIPELINE.description, pipelineName, namespace)
if (awaitReady && schedulerClient != null) {
awaitStatus(pipelineName, "PipelineTerminated")
}
}
}

export function loadExperiment(experimentName, data, awaitReady=true) {
if(!seldonObjExists(seldonObjectType.EXPERIMENT, experimentName, namespace)) {
kubeclient.apply(data)
let created = kubeclient.get(seldonObjectType.EXPERIMENT.description, experimentName, namespace)
if ('uid' in created.metadata) {
if (awaitReady && schedulerClient != null) {
awaitExperimentStart(experimentName)
}
}
}
}

export function unloadExperiment(experimentName, awaitReady=true) {
if(seldonObjExists(seldonObjectType.EXPERIMENT, experimentName, namespace)) {
kubeclient.delete(seldonObjExists.EXPERIMENT.description, experimentName, namespace)
if (awaitReady && schedulerClient != null) {
awaitExperimentStop(experimentName)
}
}
}
Loading
Loading