diff --git a/tests/k6/Makefile b/tests/k6/Makefile index ad518b245b..ec925f2297 100644 --- a/tests/k6/Makefile +++ b/tests/k6/Makefile @@ -94,5 +94,5 @@ create-secret: xk6-install: # Install xk6 - go install github.com/grafana/xk6/cmd/xk6@latest + go install go.k6.io/xk6/cmd/xk6@latest xk6 build --with github.com/grafana/xk6-kubernetes diff --git a/tests/k6/components/k8s.js b/tests/k6/components/k8s.js new file mode 100644 index 0000000000..4472c899e0 --- /dev/null +++ b/tests/k6/components/k8s.js @@ -0,0 +1,104 @@ +import { Kubernetes } from "k6/x/kubernetes"; +import { getConfig } from '../components/settings.js' +import { + awaitStatus, + awaitPipelineStatus, + awaitExperimentStart, + awaitExperimentStop +} from '../components/scheduler.js'; +import { seldonObjectType } from '../components/seldon.js' + +const kubeclient = new Kubernetes(); +const namespace = getConfig().namespace; +var schedulerClient = null; + +export function connectScheduler(schedulerCl) { + schedulerClient = schedulerCl +} + +export function disconnectScheduler() { + schedulerClient = null +} + +function seldonObjExists(kind, name, ns) { + // This is ugly, but xk6-kubernetes kubeclient.get(...) throws an exception if the + // underlying k8s CR doesn't exist. + + // The alternative here would be to list all objects of the given kind from the namespace + // and see if the one with the specified name exists among them. However, that would end + // up being considerably slower, and we don't want to do it on every single + // model/pipeline/experiment load or unload. + try { + kubeclient.get(kind.description, name, ns) + return true + } catch(error) { + return false + } +} + +export function loadModel(modelName, data, awaitReady=true) { + // TODO: Update existing model with new CR definition. + // At the moment, if an object with the same name exists, it will not be + // re-loaded with different settings. This is because we get a k8s apply + // conflict caused by a FieldManager being set on `.spec.memory` + if(!seldonObjExists(seldonObjectType.MODEL, modelName, namespace)) { + kubeclient.apply(data) + let created = kubeclient.get(seldonObjectType.MODEL.description, modelName, namespace) + if ('uid' in created.metadata) { + if (awaitReady && schedulerClient != null) { + awaitStatus(modelName, "ModelAvailable") + } + } + } +} + +export function unloadModel(modelName, awaitReady=true) { + if(seldonObjExists(seldonObjectType.MODEL, modelName, namespace)) { + kubeclient.delete(seldonObjectType.MODEL.description, modelName, namespace) + if (awaitReady && schedulerClient != null) { + awaitStatus(modelName, "ModelTerminated") + } + } +} + +export function loadPipeline(pipelineName, data, awaitReady=true) { + if(!seldonObjExists(seldonObjectType.PIPELINE, pipelineName, namespace)) { + kubeclient.apply(data) + let created = kubeclient.get(seldonObjectType.PIPELINE.description, pipelineName, namespace) + if ('uid' in created.metadata) { + if (awaitReady && schedulerClient != null) { + awaitStatus(pipelineName, "PipelineReady") + } + } + } +} + +export function unloadPipeline(pipelineName, awaitReady = true) { + if(seldonObjExists(seldonObjectType.PIPELINE, pipelineName, namespace)) { + kubeclient.delete(seldonObjectType.PIPELINE.description, pipelineName, namespace) + if (awaitReady && schedulerClient != null) { + awaitStatus(pipelineName, "PipelineTerminated") + } + } +} + +export function loadExperiment(experimentName, data, awaitReady=true) { + if(!seldonObjExists(seldonObjectType.EXPERIMENT, experimentName, namespace)) { + kubeclient.apply(data) + let created = kubeclient.get(seldonObjectType.EXPERIMENT.description, experimentName, namespace) + if ('uid' in created.metadata) { + if (awaitReady && schedulerClient != null) { + awaitExperimentStart(experimentName) + } + } + } +} + +export function unloadExperiment(experimentName, awaitReady=true) { + if(seldonObjExists(seldonObjectType.EXPERIMENT, experimentName, namespace)) { + kubeclient.delete(seldonObjExists.EXPERIMENT.description, experimentName, namespace) + if (awaitReady && schedulerClient != null) { + awaitExperimentStop(experimentName) + } + } +} diff --git a/tests/k6/components/model.js b/tests/k6/components/model.js index dfbf51fa32..73755309ec 100644 --- a/tests/k6/components/model.js +++ b/tests/k6/components/model.js @@ -1,3 +1,6 @@ +import { dump as yamlDump } from "https://cdn.jsdelivr.net/npm/js-yaml@4.1.0/dist/js-yaml.mjs"; +import { getConfig } from '../components/settings.js' + const tfsimple_string = "tfsimple_string" const tfsimple = "tfsimple" const iris = "iris" // mlserver @@ -97,32 +100,32 @@ export function getModelInferencePayload(modelName, inferBatchSize) { const shape = [inferBatchSize, 16] var httpBytes = [] var grpcBytes = [] - + for (var i = 0; i < 16 * inferBatchSize; i++) { grpcBytes.push("MQ=="); // base64 of 1 httpBytes.push("97") } const payload = { - "http": {"inputs":[{"name":"INPUT0","data":httpBytes,"datatype":"BYTES","shape":shape},{"name":"INPUT1","data":httpBytes,"datatype":"BYTES","shape":shape}]}, - "grpc": {"inputs":[{"name":"INPUT0","contents":{"bytes_contents":grpcBytes},"datatype":"BYTES","shape":shape},{"name":"INPUT1","contents":{"bytes_contents":grpcBytes},"datatype":"BYTES","shape":shape}]} + "http": { "inputs": [{ "name": "INPUT0", "data": httpBytes, "datatype": "BYTES", "shape": shape }, { "name": "INPUT1", "data": httpBytes, "datatype": "BYTES", "shape": shape }] }, + "grpc": { "inputs": [{ "name": "INPUT0", "contents": { "bytes_contents": grpcBytes }, "datatype": "BYTES", "shape": shape }, { "name": "INPUT1", "contents": { "bytes_contents": grpcBytes }, "datatype": "BYTES", "shape": shape }] } } return payload - } else if (modelName == tfsimple) { - const shape = [inferBatchSize ,16] + } else if (modelName == tfsimple) { + const shape = [inferBatchSize, 16] var data = [] for (var i = 0; i < 16 * inferBatchSize; i++) { data.push(i) } return { - "http": {"inputs":[{"name":"INPUT0","data": data,"datatype":"INT32","shape":shape},{"name":"INPUT1","data":data,"datatype":"INT32","shape":shape}]}, - "grpc": {"inputs":[{"name":"INPUT0","contents":{"int_contents":data},"datatype":"INT32","shape":shape},{"name":"INPUT1","contents":{"int_contents":data},"datatype":"INT32","shape":shape}]} + "http": { "inputs": [{ "name": "INPUT0", "data": data, "datatype": "INT32", "shape": shape }, { "name": "INPUT1", "data": data, "datatype": "INT32", "shape": shape }] }, + "grpc": { "inputs": [{ "name": "INPUT0", "contents": { "int_contents": data }, "datatype": "INT32", "shape": shape }, { "name": "INPUT1", "contents": { "int_contents": data }, "datatype": "INT32", "shape": shape }] } } - } else if (modelName == add10) { + } else if (modelName == add10) { const shape = [4] var data = new Array(4).fill(0.1) return { - "http": {"inputs":[{"name":"INPUT","data": data,"datatype":"FP32","shape":shape}]}, - "grpc": {"inputs":[{"name":"INPUT","contents":{"int_contents":data},"datatype":"FP32","shape":shape}]} + "http": { "inputs": [{ "name": "INPUT", "data": data, "datatype": "FP32", "shape": shape }] }, + "grpc": { "inputs": [{ "name": "INPUT", "contents": { "int_contents": data }, "datatype": "FP32", "shape": shape }] } } } else if (modelName == iris) { const shape = [inferBatchSize, 4] @@ -131,8 +134,8 @@ export function getModelInferencePayload(modelName, inferBatchSize) { data.push(i) } return { - "http": {"inputs": [{"name": "predict", "shape": shape, "datatype": "FP32", "data": [data]}]}, - "grpc": {"inputs":[{"name":"input","contents":{"fp32_contents":data},"datatype":"FP32","shape":shape}]} + "http": { "inputs": [{ "name": "predict", "shape": shape, "datatype": "FP32", "data": [data] }] }, + "grpc": { "inputs": [{ "name": "input", "contents": { "fp32_contents": data }, "datatype": "FP32", "shape": shape }] } } } else if (modelName == sentiment) { const shape = [inferBatchSize] @@ -145,40 +148,40 @@ export function getModelInferencePayload(modelName, inferBatchSize) { grpcBytes.push(base64) } return { - "http": {"inputs": [{"name": "args", "shape": shape, "datatype": "BYTES", "data": httpBytes}]}, - "grpc": {"inputs":[{"name":"args","contents":{"bytes_contents":grpcBytes},"datatype":"BYTES","shape":shape}]} + "http": { "inputs": [{ "name": "args", "shape": shape, "datatype": "BYTES", "data": httpBytes }] }, + "grpc": { "inputs": [{ "name": "args", "contents": { "bytes_contents": grpcBytes }, "datatype": "BYTES", "shape": shape }] } } } else if (modelName == pytorch_cifar10) { const shape = [inferBatchSize, 3, 32, 32] - const data = new Array(3*32*32*inferBatchSize).fill(0.1) + const data = new Array(3 * 32 * 32 * inferBatchSize).fill(0.1) const datatype = "FP32" return { - "http": {"inputs":[{"name":"input__0","data": data,"datatype":datatype,"shape":shape}]}, - "grpc": {"inputs":[{"name":"input__0","contents":{"fp32_contents":data},"datatype":datatype,"shape":shape}]} + "http": { "inputs": [{ "name": "input__0", "data": data, "datatype": datatype, "shape": shape }] }, + "grpc": { "inputs": [{ "name": "input__0", "contents": { "fp32_contents": data }, "datatype": datatype, "shape": shape }] } } } else if (modelName == tfmnist) { const shape = [inferBatchSize, 28, 28, 1] - const data = new Array(28*28*inferBatchSize).fill(0.1) + const data = new Array(28 * 28 * inferBatchSize).fill(0.1) const datatype = "FP32" return { - "http": {"inputs":[{"name":"conv2d_input","data": data,"datatype":datatype,"shape":shape}]}, - "grpc": {"inputs":[{"name":"conv2d_input","contents":{"fp32_contents":data},"datatype":datatype,"shape":shape}]} + "http": { "inputs": [{ "name": "conv2d_input", "data": data, "datatype": datatype, "shape": shape }] }, + "grpc": { "inputs": [{ "name": "conv2d_input", "contents": { "fp32_contents": data }, "datatype": datatype, "shape": shape }] } } } else if (modelName == tfresnet152) { const shape = [inferBatchSize, 224, 224, 3] - const data = new Array(3*224*224*inferBatchSize).fill(0.1) + const data = new Array(3 * 224 * 224 * inferBatchSize).fill(0.1) const datatype = "FP32" return { - "http": {"inputs":[{"name":"input_1","data": data,"datatype":datatype,"shape":shape}]}, - "grpc": {"inputs":[{"name":"input_1","contents":{"fp32_contents":data},"datatype":datatype,"shape":shape}]} + "http": { "inputs": [{ "name": "input_1", "data": data, "datatype": datatype, "shape": shape }] }, + "grpc": { "inputs": [{ "name": "input_1", "contents": { "fp32_contents": data }, "datatype": datatype, "shape": shape }] } } } else if (modelName == onnx_gpt2) { const shape = [inferBatchSize, 10] - const data = new Array(10*inferBatchSize).fill(1) + const data = new Array(10 * inferBatchSize).fill(1) const datatype = "INT32" return { - "http": {"inputs":[{"name":"input_ids","data": data,"datatype":datatype,"shape":shape}, {"name":"attention_mask","data": data,"datatype":datatype,"shape":shape}]}, - "grpc": {"inputs":[{"name":"input_ids","contents":{"int_contents":data},"datatype":datatype,"shape":shape}, {"name":"attention_mask","contents":{"int_contents":data},"datatype":datatype,"shape":shape}]} + "http": { "inputs": [{ "name": "input_ids", "data": data, "datatype": datatype, "shape": shape }, { "name": "attention_mask", "data": data, "datatype": datatype, "shape": shape }] }, + "grpc": { "inputs": [{ "name": "input_ids", "contents": { "int_contents": data }, "datatype": datatype, "shape": shape }, { "name": "attention_mask", "contents": { "int_contents": data }, "datatype": datatype, "shape": shape }] } } } else if (modelName == mlflow_wine) { const fields = ["fixed acidity", "volatile acidity", "citric acidity", "residual sugar", "chlorides", "free sulfur dioxide", "total sulfur dioxide", "density", "pH", "sulphates", "alcohol"] @@ -186,7 +189,7 @@ export function getModelInferencePayload(modelName, inferBatchSize) { const data = new Array(1).fill(1) const data_all = new Array(fields.length).fill(1) const datatype = "FP32" - var v2Fields = []; + var v2Fields = []; var v2FieldsGrpc = []; for (var i = 0; i < fields.length; i++) { v2Fields.push({ @@ -197,14 +200,14 @@ export function getModelInferencePayload(modelName, inferBatchSize) { }) v2FieldsGrpc.push({ "name": fields[i], - "contents": {"fp32_contents": data}, + "contents": { "fp32_contents": data }, "datatype": datatype, "shape": shape, }) } return { - "http": {"inputs": v2Fields, "parameters": {"content_type": "pd"}}, - "grpc": {"inputs": v2FieldsGrpc, "parameters": {"content_type": {"string_param": "pd"}}} + "http": { "inputs": v2Fields, "parameters": { "content_type": "pd" } }, + "grpc": { "inputs": v2FieldsGrpc, "parameters": { "content_type": { "string_param": "pd" } } } } } } @@ -217,14 +220,15 @@ export function generateExperiment(experimentName, modelType, modelName1, modelN uri = uri + (uriOffset % modelTemplate.maxUriSuffix).toString() } - const model1 = {"model": { - "meta":{ + const model1 = { + "model": { + "meta": { "name": modelName1 }, - "modelSpec":{ + "modelSpec": { "uri": uri, "requirements": modelTemplate.requirements, - "memoryBytes": (memoryBytes == null)?modelTemplate.memoryBytes:memoryBytes + "memoryBytes": (memoryBytes == null) ? modelTemplate.memoryBytes : memoryBytes }, "deploymentSpec": { "replicas": replicas @@ -232,14 +236,32 @@ export function generateExperiment(experimentName, modelType, modelName1, modelN } } - const model2 = {"model": { - "meta":{ + const model1CR = { + "apiVersion": "mlops.seldon.io/v1alpha1", + "kind": "Model", + "metadata": { + "name": modelName1, + "namespace": getConfig().namespace + }, + "spec": { + "storageUri": uri, + "requirements": modelTemplate.requirements, + "memory": (memoryBytes == null) ? modelTemplate.memoryBytes : memoryBytes, + "replicas": replicas + } + } + + const model1CRYaml = yamlDump(model1CR) + + const model2 = { + "model": { + "meta": { "name": modelName2 }, - "modelSpec":{ + "modelSpec": { "uri": uri, "requirements": modelTemplate.requirements, - "memoryBytes": (memoryBytes == null)?modelTemplate.memoryBytes:memoryBytes + "memoryBytes": (memoryBytes == null) ? modelTemplate.memoryBytes : memoryBytes }, "deploymentSpec": { "replicas": replicas @@ -247,21 +269,57 @@ export function generateExperiment(experimentName, modelType, modelName1, modelN } } - const experiment = {"experiment":{ - "name":experimentName, + const model2CR = { + "apiVersion": "mlops.seldon.io/v1alpha1", + "kind": "Model", + "metadata": { + "name": modelName2, + "namespace": getConfig().namespace + }, + "spec": { + "storageUri": uri, + "requirements": modelTemplate.requirements, + "memory": (memoryBytes == null) ? modelTemplate.memoryBytes : memoryBytes, + "replicas": replicas + } + } + + const model2CRYaml = yamlDump(model2CR) + + const experiment = { + "experiment": { + "name": experimentName, "defaultModel": modelName1, - "candidates":[ - {"modelName": modelName1,"weight":50}, - {"modelName": modelName2,"weight":50} + "candidates": [ + { "modelName": modelName1, "weight": 50 }, + { "modelName": modelName2, "weight": 50 } ] } } + const experimentCR = { + "apiVersion": "mlops.seldon.io/v1alpha1", + "kind": "Experiment", + "metadata": { + "name": experimentName, + "namespace": getConfig().namespace + }, + "spec": { + "default": modelName1, + "candidates": experiment.experiment.candidates + } + } + + const experimentCRYaml = yamlDump(experimentCR) + const inference = getModelInferencePayload(modelType, inferBatchSize) return { - "model1Defn": isProxy ? {"request": model1} : model1, - "model2Defn": isProxy ? {"request": model2} : model2, + "model1Defn": isProxy ? { "request": model1 } : model1, + "model1CRYaml": model1CRYaml, + "model2Defn": isProxy ? { "request": model2 } : model2, + "model2CRYaml": model2CRYaml, "experimentDefn": experiment, + "experimentCRYaml": experimentCRYaml, "inference": JSON.parse(JSON.stringify(inference)) } } @@ -274,14 +332,15 @@ export function generateModel(modelType, modelName, uriOffset, replicas, isProxy uri = uri + (uriOffset % modelTemplate.maxUriSuffix).toString() } - const model = {"model": { - "meta":{ + const model = { + "model": { + "meta": { "name": modelName }, - "modelSpec":{ + "modelSpec": { "uri": uri, "requirements": modelTemplate.requirements, - "memoryBytes": (memoryBytes == null)?modelTemplate.memoryBytes:memoryBytes + "memoryBytes": (memoryBytes == null) ? modelTemplate.memoryBytes : memoryBytes }, "deploymentSpec": { "replicas": replicas @@ -289,21 +348,57 @@ export function generateModel(modelType, modelName, uriOffset, replicas, isProxy } } + const modelCR = { + "apiVersion": "mlops.seldon.io/v1alpha1", + "kind": "Model", + "metadata": { + "name": modelName, + "namespace": getConfig().namespace + }, + "spec": { + "storageUri": uri, + "requirements": modelTemplate.requirements, + "memory": (memoryBytes == null) ? modelTemplate.memoryBytes : memoryBytes, + "replicas": replicas + } + } + + const modelCRYaml = yamlDump(modelCR) + // simple one node pipeline - const pipeline = {"pipeline": { - "name": generatePipelineName(modelName), - "steps": [ - {"name": modelName} - ], - "output":{ - "steps": [modelName] } + const pipeline = { + "pipeline": { + "name": generatePipelineName(modelName), + "steps": [ + { "name": modelName } + ], + "output": { + "steps": [modelName] + } } } + const pipelineCR = { + "apiVersion": "mlops.seldon.io/v1alpha1", + "kind": "Pipeline", + "metadata": { + "name": generatePipelineName(modelName), + "namespace": getConfig().namespace + }, + "spec": { + "steps": pipeline.pipeline.steps, + "output": pipeline.pipeline.output + } + } + + const pipelineCRYaml = yamlDump(pipelineCR) + const inference = getModelInferencePayload(modelType, inferBatchSize) return { - "modelDefn": isProxy ? {"request": model} : model, + "modelDefn": isProxy ? { "request": model } : model, + "modelCRYaml": modelCRYaml, "pipelineDefn": pipeline, // note that we can only deploy a pipeline with a real scheduler + "pipelineCRYaml": pipelineCRYaml, "inference": JSON.parse(JSON.stringify(inference)) } } diff --git a/tests/k6/components/scheduler.js b/tests/k6/components/scheduler.js index 9ba94259ac..ad42c9dc7d 100644 --- a/tests/k6/components/scheduler.js +++ b/tests/k6/components/scheduler.js @@ -16,7 +16,6 @@ export function disconnectScheduler() { } export function loadModel(modelName, data, awaitReady=true) { - //console.log(data) const response = schedulerClient.invoke('seldon.mlops.scheduler.Scheduler/LoadModel', data); if (check(response, {'load model success': (r) => r && r.status === grpc.StatusOK})) { if (awaitReady) { @@ -30,8 +29,6 @@ export function getModelStatus(modelName) { const response = schedulerClient.invoke('seldon.mlops.scheduler.Scheduler/ModelStatus', data); if (check(response, {'model status success': (r) => r && r.status === grpc.StatusOK})) { const responseData = response.message - //console.log(JSON.stringify(response.message)); - if (responseData.versions.length !== 1) { return "" } else { @@ -50,7 +47,6 @@ export function awaitStatus(modelName, status) { export function unloadModel(modelName, awaitReady = true) { const data = {"model":{"name":modelName}} - //console.log(JSON.stringify(data)) const response = schedulerClient.invoke('seldon.mlops.scheduler.Scheduler/UnloadModel', data); if (check(response, {'unload model success': (r) => r && r.status === grpc.StatusOK})) { if (awaitReady) { @@ -73,7 +69,6 @@ export function getPipelineStatus(pipelineName) { const response = schedulerClient.invoke('seldon.mlops.scheduler.Scheduler/PipelineStatus', data); if (check(response, {'pipeline status success': (r) => r && r.status === grpc.StatusOK})) { const responseData = response.message - //console.log(JSON.stringify(response.message)); return responseData.versions[responseData.versions.length-1].state.status } else { return "" @@ -102,7 +97,6 @@ export function isExperimentActive(experimentName) { const response = schedulerClient.invoke('seldon.mlops.scheduler.Scheduler/ExperimentStatus', data); if (check(response, {'experiment status success': (r) => r && r.status === grpc.StatusOK})) { const responseData = response.message - //console.log(JSON.stringify(response.message)); return responseData.active } else { return false @@ -122,7 +116,6 @@ export function awaitExperimentStop(experimentName) { } export function loadExperiment(experimentName, data, awaitReady=true) { - //console.log(data) const response = schedulerClient.invoke('seldon.mlops.scheduler.Scheduler/StartExperiment', data); if (check(response, {'start experiment success': (r) => r && r.status === grpc.StatusOK})) { if (awaitReady) { @@ -132,7 +125,6 @@ export function loadExperiment(experimentName, data, awaitReady=true) { } export function unloadExperiment(experimentName, awaitReady=true) { - //console.log(data) const data = {"name": experimentName} const response = schedulerClient.invoke('seldon.mlops.scheduler.Scheduler/StopExperiment', data); if (check(response, {'stop experiment success': (r) => r && r.status === grpc.StatusOK})) { diff --git a/tests/k6/components/seldon.js b/tests/k6/components/seldon.js new file mode 100644 index 0000000000..1b1e0cff85 --- /dev/null +++ b/tests/k6/components/seldon.js @@ -0,0 +1,5 @@ +export const seldonObjectType = { + MODEL: Symbol("Model.mlops.seldon.io"), + PIPELINE: Symbol("Pipeline.mlops.seldon.io"), + EXPERIMENT: Symbol("Experiment.mlops.seldon.io") +}; diff --git a/tests/k6/components/settings.js b/tests/k6/components/settings.js index f5ced599e5..b3d5e50e0b 100644 --- a/tests/k6/components/settings.js +++ b/tests/k6/components/settings.js @@ -1,3 +1,10 @@ +function useKubeControlPlane() { + if (__ENV.USE_KUBE_CONTROL_PLANE) { + return (__ENV.USE_KUBE_CONTROL_PLANE === "true") + } + return false +} + function schedulerEndpoint() { if (__ENV.SCHEDULER_ENDPOINT) { return __ENV.SCHEDULER_ENDPOINT @@ -218,6 +225,7 @@ function podNamespace() { export function getConfig() { return { + "useKubeControlPlane": useKubeControlPlane(), "schedulerEndpoint": schedulerEndpoint(), "inferHttpEndpoint": inferHttpEndpoint(), "inferGrpcEndpoint": inferGrpcEndpoint(), diff --git a/tests/k6/components/utils.js b/tests/k6/components/utils.js index d938013c26..43f90d4ef9 100644 --- a/tests/k6/components/utils.js +++ b/tests/k6/components/utils.js @@ -1,47 +1,46 @@ import { sleep } from 'k6'; import { generateModel, generatePipelineName } from '../components/model.js'; -import { connectScheduler, disconnectScheduler, loadModel, unloadModel, loadPipeline, unloadPipeline, awaitPipelineStatus } from '../components/scheduler.js'; +import { connectScheduler, + disconnectScheduler, + loadModel, + unloadModel, + loadPipeline, + unloadPipeline, + awaitPipelineStatus, + loadExperiment, + unloadExperiment +} from '../components/scheduler.js'; import { connectScheduler as connectSchedulerProxy, disconnectScheduler as disconnectSchedulerProxy, loadModel as loadModelProxy, unloadModel as unloadModelProxy } from '../components/scheduler_proxy.js'; +import { seldonObjectType } from '../components/seldon.js' import { inferGrpcLoop, inferHttpLoop, modelStatusHttp } from '../components/v2.js'; +import * as k8s from '../components/k8s.js'; -export function setupBase(config ) { +export function setupBase(config) { if (config.loadModel) { - - var connectSchedulerFn = connectScheduler - if (config.isSchedulerProxy) { - connectSchedulerFn = connectSchedulerProxy - } - - connectSchedulerFn(config.schedulerEndpoint) + const ctl = connectControlPlaneOps(config) - for (let j = 0; j < config.maxNumModels.length; j++) { + for (let j = 0; j < config.maxNumModels.length; j++) { for (let i = 0; i < config.maxNumModels[j]; i++) { const modelName = config.modelNamePrefix[j] + i.toString() const model = generateModel(config.modelType[j], modelName, 1, config.modelReplicas[j], config.isSchedulerProxy, config.modelMemoryBytes[j], config.inferBatchSize[j]) - const modelDefn = model.modelDefn - const pipelineDefn = model.pipelineDefn - - var loadModelFn = loadModel - if (config.isSchedulerProxy) { - loadModelFn = loadModelProxy - } - loadModelFn(modelName, modelDefn, false) + var defs = getSeldonObjDef(config, model, seldonObjectType.MODEL) + ctl.loadModelFn(modelName, defs.model.modelDefn, false) if (config.isLoadPipeline) { - loadPipeline(generatePipelineName(modelName), pipelineDefn, false) // we use pipeline name as model name + ctl.loadPipelineFn(generatePipelineName(modelName), defs.model.pipelineDefn, false) // we use pipeline name as model name } } } // note: this doesnt work in case of kafka if (!config.isLoadPipeline) { - for (let j = 0; j < config.maxNumModels.length; j++) { + for (let j = 0; j < config.maxNumModels.length; j++) { const n = config.maxNumModels[j] - 1 const modelName = config.modelNamePrefix[j] + n.toString() const modelNameWithVersion = modelName + getVersionSuffix(config.isSchedulerProxy) // first version @@ -50,7 +49,7 @@ export function setupBase(config ) { } } } else { - for (let j = 0; j < config.maxNumModels.length; j++) { + for (let j = 0; j < config.maxNumModels.length; j++) { const n = config.maxNumModels[j] - 1 const modelName = config.modelNamePrefix[j] + n.toString() awaitPipelineStatus(generatePipelineName(modelName), "PipelineReady") @@ -59,12 +58,12 @@ export function setupBase(config ) { if (config.doWarmup) { // warm up - for (let j = 0; j < config.maxNumModels.length; j++) { + for (let j = 0; j < config.maxNumModels.length; j++) { for (let i = 0; i < config.maxNumModels[j]; i++) { const modelName = config.modelNamePrefix[j] + i.toString() const modelNameWithVersion = modelName + getVersionSuffix(config.isSchedulerProxy) // first version - + const model = generateModel(config.modelType[j], modelNameWithVersion, 1, 1, config.isSchedulerProxy, config.modelMemoryBytes[j]) inferHttpLoop( @@ -73,45 +72,125 @@ export function setupBase(config ) { } } - var disconnectSchedulerFn = disconnectScheduler - if (config.isSchedulerProxy) { - disconnectSchedulerFn = disconnectSchedulerProxy - } - disconnectSchedulerFn() + disconnectControlPlaneOps(ctl, config) } } export function teardownBase(config ) { if (config.unloadModel) { - var connectSchedulerFn = connectScheduler - if (config.isSchedulerProxy) { - connectSchedulerFn = connectSchedulerProxy - } - connectSchedulerFn(config.schedulerEndpoint) + const ctl = connectControlPlaneOps(config) - var unloadModelFn = unloadModel - if (config.isSchedulerProxy) { - unloadModelFn = unloadModelProxy - } - - for (let j = 0; j < config.maxNumModels.length; j++) { + for (let j = 0; j < config.maxNumModels.length; j++) { for (let i = 0; i < config.maxNumModels[j]; i++) { const modelName = config.modelNamePrefix[j] + i.toString() // if we have added a pipeline, unloaded it if (config.isLoadPipeline) { - unloadPipeline(generatePipelineName(modelName)) + ctl.unloadPipelineFn(generatePipelineName(modelName)) } - unloadModelFn(modelName, false) + ctl.unloadModelFn(modelName, false) } } - var disconnectSchedulerFn = disconnectScheduler - if (config.isSchedulerProxy) { - disconnectSchedulerFn = disconnectSchedulerProxy - } - disconnectSchedulerFn() + disconnectControlPlaneOps(ctl, config) + } +} + +function warnFn(fnName, cause, name, data, awaitReady=true) { + console.log("WARN: "+ fnName + " function not implemented." + cause) +} + +export function connectControlPlaneOps(config) { + var ctl = {} + + ctl.connectSchedulerFn = connectScheduler + ctl.disconnectSchedulerFn = disconnectScheduler + if (config.isSchedulerProxy) { + ctl.connectSchedulerFn = connectSchedulerProxy + ctl.disconnectSchedulerFn = disconnectSchedulerProxy + } + + if (config.useKubeControlPlane) { + ctl.loadModelFn = k8s.loadModel + ctl.unloadModelFn = k8s.unloadModel + ctl.loadPipelineFn = k8s.loadPipeline + ctl.unloadPipelineFn = k8s.unloadPipeline + ctl.loadExperimentFn = k8s.loadExperiment + ctl.unloadExperimentFn = k8s.unloadExperiment + } else { + ctl.loadModelFn = loadModel + ctl.unloadModelFn = unloadModel + ctl.loadPipelineFn = loadPipeline + ctl.unloadPipelineFn = unloadPipeline + ctl.loadExperimentFn = loadExperiment + ctl.unloadExperimentFn = unloadExperiment + if (config.isSchedulerProxy) { + const warnCause = "Using SchedulerProxy" + ctl.loadModelFn = loadModelProxy + ctl.unloadModelFn = unloadModelProxy + ctl.loadPipelineFn = warnFn.bind(this, "loadPipeline", warnCause) + ctl.unloadPipelineFn = warnFn.bind(this, "unloadPipeline", warnCause) + ctl.loadExperimentFn = warnFn.bind(this, "loadExperiment", warnCause) + ctl.unloadExperimentFn = warnFn.bind(this, "unloadExperiment", warnCause) + } + } + + const schedClient = ctl.connectSchedulerFn(config.schedulerEndpoint) + // pass scheduler client to k8s for Model/Pipeline status queries + if (config.useKubeControlPlane && !config.isSchedulerProxy) { + k8s.connectScheduler(schedClient) + } + + return ctl +} + +export function disconnectControlPlaneOps(ctl, config) { + if (config.useKubeControlPlane && !config.isSchedulerProxy) { + k8s.disconnectScheduler() + } + ctl.disconnectSchedulerFn() +} + +export function getSeldonObjDef(config, object, type) { + var objDef = { + "model": { + "modelDefn": null, + "pipelineDefn": null, + }, + "experiment": { + "model1Defn": null, + "model2Defn": null, + "experimentDefn": null } + }; + + if (config.useKubeControlPlane) { + switch (type) { + case seldonObjectType.MODEL: + objDef.model.modelDefn = object.modelCRYaml + objDef.model.pipelineDefn = object.pipelineCRYaml + break; + case seldonObjectType.EXPERIMENT: + objDef.experiment.model1Defn = object.model1CRYaml + objDef.experiment.model2Defn = object.model2CRYaml + objDef.experiment.experimentDefn = object.experimentCRYaml + break; + } + } else { + switch (type) { + case seldonObjectType.MODEL: + objDef.model.modelDefn = object.modelDefn + objDef.model.pipelineDefn = object.pipelineDefn + break; + case seldonObjectType.EXPERIMENT: + objDef.experiment.model1Defn = object.model1Defn + objDef.experiment.model2Defn = object.model2Defn + objDef.experiment.experimentDefn = object.experimentDefn + break; + } + } + + return objDef } export function doInfer(modelName, modelNameWithVersion, config, isHttp, idx) { diff --git a/tests/k6/configs/k8s/base/k6.yaml b/tests/k6/configs/k8s/base/k6.yaml index cd16f741ef..f005a361a6 100644 --- a/tests/k6/configs/k8s/base/k6.yaml +++ b/tests/k6/configs/k8s/base/k6.yaml @@ -45,6 +45,8 @@ spec: # "scenarios/k8s-test-script.js", # ] env: + - name: USE_KUBE_CONTROL_PLANE + value: "true" - name: SCHEDULER_ENDPOINT value: "${SCHEDULER_ENDPOINT}:9004" - name: INFER_HTTP_ITERATIONS @@ -52,7 +54,7 @@ spec: - name: INFER_GRPC_ITERATIONS value: "1" - name: MODELNAME_PREFIX - value: "tfsimplea,pytorch_cifar10a,tfmnista,mlflow_winea,irisa" + value: "tfsimplea,pytorch-cifar10a,tfmnista,mlflow-winea,irisa" - name: MODEL_TYPE value: "tfsimple,pytorch_cifar10,tfmnist,mlflow_wine,iris" - name: MODEL_MEMORY_BYTES