Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource generic load, unload, status to CLI #4660

Merged
merged 2 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions operator/cmd/seldon/cli/load.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package cli

import (
"k8s.io/utils/env"

"github.com/seldonio/seldon-core/operator/v2/pkg/cli"
"github.com/spf13/cobra"
)

func createLoad() *cobra.Command {
cmd := &cobra.Command{
Use: "load",
Short: "load resources",
Long: `load resources`,
Args: cobra.MinimumNArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
flags := cmd.Flags()

schedulerHostIsSet := flags.Changed(flagSchedulerHost)
schedulerHost, err := flags.GetString(flagSchedulerHost)
if err != nil {
return err
}
authority, err := flags.GetString(flagAuthority)
if err != nil {
return err
}
filename, err := flags.GetString(flagFile)
if err != nil {
return err
}
showRequest, err := flags.GetBool(flagShowRequest)
if err != nil {
return err
}
showResponse, err := flags.GetBool(flagShowResponse)
if err != nil {
return err
}

schedulerClient, err := cli.NewSchedulerClient(schedulerHost, schedulerHostIsSet, authority)
if err != nil {
return err
}

var dataFile []byte
if filename != "" {
dataFile = loadFile(filename)
}
err = schedulerClient.Load(dataFile, showRequest, showResponse)
return err
},
}

flags := cmd.Flags()
flags.BoolP(flagShowRequest, "r", false, "show request")
flags.BoolP(flagShowResponse, "o", false, "show response")
flags.String(flagSchedulerHost, env.GetString(envScheduler, defaultSchedulerHost), helpSchedulerHost)
flags.String(flagAuthority, "", helpAuthority)
flags.StringP(flagFile, "f", "", "model manifest file (YAML)")

return cmd
}
7 changes: 6 additions & 1 deletion operator/cmd/seldon/cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,16 @@ func GetCmd() *cobra.Command {
cmdConfigRemove := createConfigRemove()
cmdConfigList := createConfigList()

// Generic commands
cmdLoad := createLoad()
cmdUnload := createUnload()
cmdStatus := createStatus()

var rootCmd = &cobra.Command{Use: "seldon", SilenceErrors: false, SilenceUsage: true}

rootCmd.DisableAutoGenTag = true

rootCmd.AddCommand(cmdModel, cmdServer, cmdExperiment, cmdPipeline, cmdConfig)
rootCmd.AddCommand(cmdModel, cmdServer, cmdExperiment, cmdPipeline, cmdConfig, cmdLoad, cmdUnload, cmdStatus)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to keep consistency with previous usage (i.e. seldon model load and seldon pipeline load), an alternative is perhaps to use
seldon all load? I am personally not entirely sure which approach is better so happy to leave it as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems a bit verbose - maybe will leave for now

cmdModel.AddCommand(cmdModelLoad, cmdModelUnload, cmdModelStatus, cmdModelInfer, cmdModelMeta, cmdModelList)
cmdServer.AddCommand(cmdServerStatus, cmdServerList)
cmdExperiment.AddCommand(cmdExperimentStart, cmdExperimentStop, cmdExperimentStatus, cmdExperimentList)
Expand Down
67 changes: 67 additions & 0 deletions operator/cmd/seldon/cli/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package cli

import (
"k8s.io/utils/env"

"github.com/seldonio/seldon-core/operator/v2/pkg/cli"
"github.com/spf13/cobra"
)

func createStatus() *cobra.Command {
cmd := &cobra.Command{
Use: "status <pipelineName>",
Short: "status of a pipeline",
Long: `status of a pipeline`,
Args: cobra.ExactArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
flags := cmd.Flags()

schedulerHostIsSet := flags.Changed(flagSchedulerHost)
schedulerHost, err := flags.GetString(flagSchedulerHost)
if err != nil {
return err
}
authority, err := flags.GetString(flagAuthority)
if err != nil {
return err
}
showRequest, err := flags.GetBool(flagShowRequest)
if err != nil {
return err
}
showResponse, err := flags.GetBool(flagShowResponse)
if err != nil {
return err
}
waitCondition, err := flags.GetBool(flagWaitCondition)
if err != nil {
return err
}
filename, err := flags.GetString(flagFile)
if err != nil {
return err
}
var dataFile []byte
if filename != "" {
dataFile = loadFile(filename)
}
schedulerClient, err := cli.NewSchedulerClient(schedulerHost, schedulerHostIsSet, authority)
if err != nil {
return err
}

err = schedulerClient.Status(dataFile, showRequest, showResponse, waitCondition)
return err
},
}

flags := cmd.Flags()
flags.BoolP(flagShowRequest, "r", false, "show request")
flags.BoolP(flagShowResponse, "o", false, "show response")
flags.String(flagSchedulerHost, env.GetString(envScheduler, defaultSchedulerHost), helpSchedulerHost)
flags.String(flagAuthority, "", helpAuthority)
flags.BoolP(flagWaitCondition, "w", false, "wait for resources to be ready")
flags.StringP(flagFile, "f", "", "model manifest file (YAML)")

return cmd
}
62 changes: 62 additions & 0 deletions operator/cmd/seldon/cli/unload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package cli

import (
"k8s.io/utils/env"

"github.com/seldonio/seldon-core/operator/v2/pkg/cli"
"github.com/spf13/cobra"
)

func createUnload() *cobra.Command {
cmd := &cobra.Command{
Use: "unload resources",
Short: "unload resources",
Long: `unload resources`,
RunE: func(cmd *cobra.Command, args []string) error {
flags := cmd.Flags()

schedulerHostIsSet := flags.Changed(flagSchedulerHost)
schedulerHost, err := flags.GetString(flagSchedulerHost)
if err != nil {
return err
}
authority, err := flags.GetString(flagAuthority)
if err != nil {
return err
}
showRequest, err := flags.GetBool(flagShowRequest)
if err != nil {
return err
}
showResponse, err := flags.GetBool(flagShowResponse)
if err != nil {
return err
}
filename, err := flags.GetString(flagFile)
if err != nil {
return err
}
var dataFile []byte
if filename != "" {
dataFile = loadFile(filename)
}

schedulerClient, err := cli.NewSchedulerClient(schedulerHost, schedulerHostIsSet, authority)
if err != nil {
return err
}

err = schedulerClient.Unload(dataFile, showRequest, showResponse)
sakoush marked this conversation as resolved.
Show resolved Hide resolved
return err
},
}

flags := cmd.Flags()
flags.BoolP(flagShowRequest, "r", false, "show request")
flags.BoolP(flagShowResponse, "o", false, "show response")
flags.String(flagSchedulerHost, env.GetString(envScheduler, defaultSchedulerHost), helpSchedulerHost)
flags.String(flagAuthority, "", helpAuthority)
flags.StringP(flagFile, "f", "", "model manifest file (YAML)")

return cmd
}
35 changes: 35 additions & 0 deletions operator/config/crd/bases/mlops.seldon.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,42 @@ spec:
spec:
description: PipelineSpec defines the desired state of Pipeline
properties:
input:
description: External inputs to this pipeline, optional
properties:
externalInputs:
description: Previous external pipeline steps to receive data
from
items:
type: string
type: array
externalTriggers:
description: Triggers required to activate inputs
items:
type: string
type: array
joinType:
description: One of inner (default), outer, or any (see above
for details)
type: string
joinWindowMs:
description: msecs to wait for messages from multiple inputs to
arrive before joining the inputs
format: int32
type: integer
tensorMap:
additionalProperties:
type: string
description: Map of tensor name conversions to use e.g. output1
-> input1
type: object
triggersJoinType:
description: One of inner (default), outer, or any (see above
for details)
type: string
type: object
output:
description: Synchronous output from this pipeline, optional
properties:
joinWindowMs:
description: msecs to wait for messages from multiple inputs to
Expand Down
142 changes: 142 additions & 0 deletions operator/pkg/cli/generic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package cli

import (
"bytes"
"errors"
"fmt"
"io"
"os"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/yaml"
)

type seldonKind int64

const (
Undefined seldonKind = iota
model
pipeline
experiment
)

var kindMap = map[string]seldonKind{
"Model": model,
"Pipeline": pipeline,
"Experiment": experiment,
}

type k8sResource struct {
kind seldonKind
name string
data []byte
}

func createDecoder(data []byte) *yaml.YAMLOrJSONDecoder {
var reader io.Reader
if len(data) > 0 {
reader = bytes.NewReader(data)
} else {
reader = io.Reader(os.Stdin)
}
dec := yaml.NewYAMLOrJSONDecoder(reader, 10)
return dec
}

func getNextResource(dec *yaml.YAMLOrJSONDecoder) (*k8sResource, bool, error) {
unstructuredObject := &unstructured.Unstructured{}
if err := dec.Decode(unstructuredObject); err != nil {
if errors.Is(err, io.EOF) {
return nil, false, nil
}
return nil, false, err
}
// Get the resource kind nad original bytes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Get the resource kind nad original bytes
// Get the resource kind and original bytes

gvk := unstructuredObject.GroupVersionKind()
data, err := unstructuredObject.MarshalJSON()
if err != nil {
return nil, false, err
}
if kind, ok := kindMap[gvk.Kind]; ok {
return &k8sResource{
kind: kind,
name: unstructuredObject.GetName(),
data: data,
}, true, nil
} else {
return nil, false, fmt.Errorf("Unknown Seldon Kind %s - only Model, Pipeline and Experiment allowed", gvk.Kind)
}
}

func (sc *SchedulerClient) Load(data []byte, showRequest bool, showResponse bool) error {
dec := createDecoder(data)
for {
resource, keepGoing, err := getNextResource(dec)
if err != nil {
return err
}
if !keepGoing {
return err
}
switch resource.kind {
case model:
err = sc.LoadModel(resource.data, showRequest, showResponse)
case pipeline:
err = sc.LoadPipeline(resource.data, showRequest, showResponse)
case experiment:
err = sc.StartExperiment(resource.data, showRequest, showResponse)
}
if err != nil {
return err
}
}
}

func (sc *SchedulerClient) Unload(data []byte, showRequest bool, showResponse bool) error {
dec := createDecoder(data)
for {
resource, keepGoing, err := getNextResource(dec)
if !keepGoing {
return err
}
switch resource.kind {
case model:
err = sc.UnloadModel("", resource.data, showRequest, showResponse)
case pipeline:
err = sc.UnloadPipeline("", resource.data, showRequest, showResponse)
case experiment:
err = sc.StopExperiment("", resource.data, showRequest, showResponse)
}
if err != nil {
return err
}
}
}

func (sc *SchedulerClient) Status(data []byte, showRequest bool, showResponse bool, wait bool) error {
dec := createDecoder(data)
for {
resource, keepGoing, err := getNextResource(dec)
if !keepGoing {
return err
}
waitCondition := ""
switch resource.kind {
case model:
if wait {
waitCondition = "ModelAvailable"
}
err = sc.ModelStatus(resource.name, showRequest, showResponse, waitCondition)
case pipeline:
if wait {
waitCondition = "PipelineReady"
}
err = sc.PipelineStatus(resource.name, showRequest, showResponse, waitCondition)
case experiment:
err = sc.ExperimentStatus(resource.name, showRequest, showResponse, wait)
}
if err != nil {
return err
}
}
}
Loading