From fb1077cf8866d6c731b49b82ae524fd6a661d270 Mon Sep 17 00:00:00 2001 From: vishal Date: Wed, 14 Oct 2020 12:07:43 -0400 Subject: [PATCH 1/4] Standardize API responses --- cli/cluster/delete.go | 6 +- cli/cluster/deploy.go | 12 +- cli/cluster/get.go | 16 +-- cli/cmd/cluster.go | 31 ++--- cli/cmd/deploy.go | 14 +-- cli/cmd/get.go | 106 +++++++++++------- cli/cmd/lib_batch_apis.go | 4 +- cli/cmd/lib_realtime_apis.go | 24 ++-- cli/cmd/lib_traffic_splitters.go | 26 +++-- cli/cmd/predict.go | 19 ++-- cli/cmd/refresh.go | 4 +- cli/local/api.go | 19 +++- cli/local/deploy.go | 25 ++--- cli/local/get.go | 49 ++++---- pkg/operator/resources/batchapi/api.go | 36 +++--- pkg/operator/resources/realtimeapi/api.go | 36 +++--- pkg/operator/resources/resources.go | 77 +++++++++---- pkg/operator/resources/trafficsplitter/api.go | 20 ++-- pkg/operator/schema/schema.go | 46 ++------ 19 files changed, 293 insertions(+), 277 deletions(-) diff --git a/cli/cluster/delete.go b/cli/cluster/delete.go index 23aade3fc1..0ece9b9b93 100644 --- a/cli/cluster/delete.go +++ b/cli/cluster/delete.go @@ -60,16 +60,16 @@ func getReadyRealtimeAPIReplicasOrNil(operatorConfig OperatorConfig, apiName str return nil } - var apiRes schema.GetAPIResponse + var apiRes schema.APIResponse if err = json.Unmarshal(httpRes, &apiRes); err != nil { return nil } - if apiRes.RealtimeAPI == nil { + if apiRes.Status == nil { return nil } - totalReady := apiRes.RealtimeAPI.Status.Updated.Ready + apiRes.RealtimeAPI.Status.Stale.Ready + totalReady := apiRes.Status.Updated.Ready + apiRes.Status.Stale.Ready return &totalReady } diff --git a/cli/cluster/deploy.go b/cli/cluster/deploy.go index 4529965ea7..2f4c148752 100644 --- a/cli/cluster/deploy.go +++ b/cli/cluster/deploy.go @@ -25,7 +25,7 @@ import ( "github.com/cortexlabs/cortex/pkg/operator/schema" ) -func Deploy(operatorConfig OperatorConfig, configPath string, deploymentBytesMap map[string][]byte, force bool) (schema.DeployResponse, error) { +func Deploy(operatorConfig OperatorConfig, configPath string, deploymentBytesMap map[string][]byte, force bool) ([]schema.DeployResult, error) { params := map[string]string{ "force": s.Bool(force), "configFileName": filepath.Base(configPath), @@ -36,13 +36,13 @@ func Deploy(operatorConfig OperatorConfig, configPath string, deploymentBytesMap response, err := HTTPUpload(operatorConfig, "/deploy", uploadInput, params) if err != nil { - return schema.DeployResponse{}, err + return nil, err } - var deployResponse schema.DeployResponse - if err := json.Unmarshal(response, &deployResponse); err != nil { - return schema.DeployResponse{}, errors.Wrap(err, "/deploy", string(response)) + var deployResults []schema.DeployResult + if err := json.Unmarshal(response, &deployResults); err != nil { + return nil, errors.Wrap(err, "/deploy", string(response)) } - return deployResponse, nil + return deployResults, nil } diff --git a/cli/cluster/get.go b/cli/cluster/get.go index dab4a4d0fc..2afc54ab8f 100644 --- a/cli/cluster/get.go +++ b/cli/cluster/get.go @@ -24,28 +24,28 @@ import ( "github.com/cortexlabs/cortex/pkg/operator/schema" ) -func GetAPIs(operatorConfig OperatorConfig) (schema.GetAPIsResponse, error) { +func GetAPIs(operatorConfig OperatorConfig) ([]schema.APIResponse, error) { httpRes, err := HTTPGet(operatorConfig, "/get") if err != nil { - return schema.GetAPIsResponse{}, err + return nil, err } - var apisRes schema.GetAPIsResponse + var apisRes []schema.APIResponse if err = json.Unmarshal(httpRes, &apisRes); err != nil { - return schema.GetAPIsResponse{}, errors.Wrap(err, "/get", string(httpRes)) + return nil, errors.Wrap(err, "/get", string(httpRes)) } return apisRes, nil } -func GetAPI(operatorConfig OperatorConfig, apiName string) (schema.GetAPIResponse, error) { +func GetAPI(operatorConfig OperatorConfig, apiName string) ([]schema.APIResponse, error) { httpRes, err := HTTPGet(operatorConfig, "/get/"+apiName) if err != nil { - return schema.GetAPIResponse{}, err + return nil, err } - var apiRes schema.GetAPIResponse + var apiRes []schema.APIResponse if err = json.Unmarshal(httpRes, &apiRes); err != nil { - return schema.GetAPIResponse{}, errors.Wrap(err, "/get/"+apiName, string(httpRes)) + return nil, errors.Wrap(err, "/get/"+apiName, string(httpRes)) } return apiRes, nil diff --git a/cli/cmd/cluster.go b/cli/cmd/cluster.go index 6bfce5e33f..22238abada 100644 --- a/cli/cmd/cluster.go +++ b/cli/cmd/cluster.go @@ -45,7 +45,6 @@ import ( "github.com/cortexlabs/cortex/pkg/types" "github.com/cortexlabs/cortex/pkg/types/clusterconfig" "github.com/cortexlabs/cortex/pkg/types/clusterstate" - "github.com/cortexlabs/cortex/pkg/types/spec" "github.com/cortexlabs/cortex/pkg/types/userconfig" "github.com/spf13/cobra" ) @@ -623,21 +622,7 @@ var _exportCmd = &cobra.Command{ exit.Error(err) } - var apiSpecs []spec.API - - for _, batchAPI := range apisResponse.BatchAPIs { - apiSpecs = append(apiSpecs, batchAPI.Spec) - } - - for _, realtimeAPI := range apisResponse.RealtimeAPIs { - apiSpecs = append(apiSpecs, realtimeAPI.Spec) - } - - for _, trafficSplitter := range apisResponse.TrafficSplitters { - apiSpecs = append(apiSpecs, trafficSplitter.Spec) - } - - if len(apiSpecs) == 0 { + if len(apisResponse) == 0 { fmt.Println(fmt.Sprintf("no apis found in cluster named %s in %s", *accessConfig.ClusterName, *accessConfig.Region)) exit.Ok() } @@ -649,24 +634,24 @@ var _exportCmd = &cobra.Command{ exit.Error(err) } - for _, apiSpec := range apiSpecs { - baseDir := filepath.Join(exportPath, apiSpec.Name) + for _, apiResponse := range apisResponse { + baseDir := filepath.Join(exportPath, apiResponse.Spec.Name) - fmt.Println(fmt.Sprintf("exporting %s to %s", apiSpec.Name, baseDir)) + fmt.Println(fmt.Sprintf("exporting %s to %s", apiResponse.Spec.Name, baseDir)) err = files.CreateDir(baseDir) if err != nil { exit.Error(err) } - err = awsClient.DownloadFileFromS3(info.ClusterConfig.Bucket, apiSpec.RawAPIKey(info.ClusterConfig.ClusterName), path.Join(baseDir, apiSpec.FileName)) + err = awsClient.DownloadFileFromS3(info.ClusterConfig.Bucket, apiResponse.Spec.RawAPIKey(info.ClusterConfig.ClusterName), path.Join(baseDir, apiResponse.Spec.FileName)) if err != nil { exit.Error(err) } - if apiSpec.Kind != userconfig.TrafficSplitterKind { - zipFileLocation := path.Join(baseDir, path.Base(apiSpec.ProjectKey)) - err = awsClient.DownloadFileFromS3(info.ClusterConfig.Bucket, apiSpec.ProjectKey, zipFileLocation) + if apiResponse.Spec.Kind != userconfig.TrafficSplitterKind { + zipFileLocation := path.Join(baseDir, path.Base(apiResponse.Spec.ProjectKey)) + err = awsClient.DownloadFileFromS3(info.ClusterConfig.Bucket, apiResponse.Spec.ProjectKey, zipFileLocation) if err != nil { exit.Error(err) } diff --git a/cli/cmd/deploy.go b/cli/cmd/deploy.go index ec3cc55d83..042667d05a 100644 --- a/cli/cmd/deploy.go +++ b/cli/cmd/deploy.go @@ -89,14 +89,14 @@ var _deployCmd = &cobra.Command{ exit.Error(ErrorDeployFromTopLevelDir("root", env.Provider)) } - var deployResponse schema.DeployResponse + var deployResults []schema.DeployResult if env.Provider == types.AWSProviderType { deploymentBytes, err := getDeploymentBytes(env.Provider, configPath) if err != nil { exit.Error(err) } - deployResponse, err = cluster.Deploy(MustGetOperatorConfig(env.Name), configPath, deploymentBytes, _flagDeployForce) + deployResults, err = cluster.Deploy(MustGetOperatorConfig(env.Name), configPath, deploymentBytes, _flagDeployForce) if err != nil { exit.Error(err) } @@ -106,14 +106,14 @@ var _deployCmd = &cobra.Command{ exit.Error(err) } - deployResponse, err = local.Deploy(env, configPath, projectFiles, _flagDeployDisallowPrompt) + deployResults, err = local.Deploy(env, configPath, projectFiles, _flagDeployDisallowPrompt) if err != nil { exit.Error(err) } } if _flagOutput == flags.JSONOutputType { - bytes, err := libjson.Marshal(deployResponse) + bytes, err := libjson.Marshal(deployResults) if err != nil { exit.Error(err) } @@ -121,7 +121,7 @@ var _deployCmd = &cobra.Command{ return } - message := deployMessage(deployResponse.Results, env.Name) + message := deployMessage(deployResults, env.Name) print.BoldFirstBlock(message) }, } @@ -292,7 +292,7 @@ func didAllResultsError(results []schema.DeployResult) bool { func getAPICommandsMessage(results []schema.DeployResult, envName string) string { apiName := "" if len(results) == 1 { - apiName = results[0].API.Name + apiName = results[0].API.Spec.Name } defaultEnv := getDefaultEnv(_generalCommandType) @@ -309,7 +309,7 @@ func getAPICommandsMessage(results []schema.DeployResult, envName string) string if len(result.Error) > 0 { continue } - if result.API.API != nil && result.API.Kind == userconfig.RealtimeAPIKind { + if result.API != nil && result.API.Spec.Kind == userconfig.RealtimeAPIKind { items.Add(fmt.Sprintf("cortex logs %s%s", apiName, envArg), "(stream api logs)") break } diff --git a/cli/cmd/get.go b/cli/cmd/get.go index 670e3753f7..bf4e277ebe 100644 --- a/cli/cmd/get.go +++ b/cli/cmd/get.go @@ -34,6 +34,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/telemetry" "github.com/cortexlabs/cortex/pkg/operator/schema" "github.com/cortexlabs/cortex/pkg/types" + "github.com/cortexlabs/cortex/pkg/types/userconfig" "github.com/spf13/cobra" ) @@ -167,17 +168,17 @@ func getAPIsInAllEnvironments() (string, error) { return "", err } - var allRealtimeAPIs []schema.RealtimeAPI + var allRealtimeAPIs []schema.APIResponse var allRealtimeAPIEnvs []string - var allBatchAPIs []schema.BatchAPI + var allBatchAPIs []schema.APIResponse var allBatchAPIEnvs []string - var allTrafficSplitters []schema.TrafficSplitter + var allTrafficSplitters []schema.APIResponse var allTrafficSplitterEnvs []string type getAPIsOutput struct { - EnvName string `json:"env_name"` - Response schema.GetAPIsResponse `json:"response"` - Error string `json:"error"` + EnvName string `json:"env_name"` + APIs []schema.APIResponse `json:"apis"` + Error string `json:"error"` } allAPIsOutput := []getAPIsOutput{} @@ -185,7 +186,7 @@ func getAPIsInAllEnvironments() (string, error) { errorsMap := map[string]error{} // get apis from both environments for _, env := range cliConfig.Environments { - var apisRes schema.GetAPIsResponse + var apisRes []schema.APIResponse var err error if env.Provider == types.AWSProviderType { apisRes, err = cluster.GetAPIs(MustGetOperatorConfig(env.Name)) @@ -194,23 +195,24 @@ func getAPIsInAllEnvironments() (string, error) { } apisOutput := getAPIsOutput{ - EnvName: env.Name, - Response: apisRes, + EnvName: env.Name, + APIs: apisRes, } if err == nil { - for range apisRes.BatchAPIs { - allBatchAPIEnvs = append(allBatchAPIEnvs, env.Name) - } - for range apisRes.RealtimeAPIs { - allRealtimeAPIEnvs = append(allRealtimeAPIEnvs, env.Name) - } - for range apisRes.TrafficSplitters { - allTrafficSplitterEnvs = append(allTrafficSplitterEnvs, env.Name) + for _, api := range apisRes { + switch api.Spec.Kind { + case userconfig.BatchAPIKind: + allBatchAPIEnvs = append(allBatchAPIEnvs, env.Name) + allBatchAPIs = append(allBatchAPIs, api) + case userconfig.RealtimeAPIKind: + allRealtimeAPIEnvs = append(allRealtimeAPIEnvs, env.Name) + allRealtimeAPIs = append(allRealtimeAPIs, api) + case userconfig.TrafficSplitterKind: + allTrafficSplitterEnvs = append(allTrafficSplitterEnvs, env.Name) + allTrafficSplitters = append(allTrafficSplitters, api) + } } - allRealtimeAPIs = append(allRealtimeAPIs, apisRes.RealtimeAPIs...) - allBatchAPIs = append(allBatchAPIs, apisRes.BatchAPIs...) - allTrafficSplitters = append(allTrafficSplitters, apisRes.TrafficSplitters...) } else { apisOutput.Error = err.Error() errorsMap[env.Name] = err @@ -271,6 +273,7 @@ func getAPIsInAllEnvironments() (string, error) { if len(allBatchAPIs) > 0 { out += "\n" + } out += t.MustFormat() @@ -312,7 +315,7 @@ func hideReplicaCountColumns(t *table.Table) { } func getAPIsByEnv(env cliconfig.Environment, printEnv bool) (string, error) { - var apisRes schema.GetAPIsResponse + var apisRes []schema.APIResponse var err error if env.Provider == types.AWSProviderType { @@ -343,7 +346,22 @@ func getAPIsByEnv(env cliconfig.Environment, printEnv bool) (string, error) { } } - if len(apisRes.RealtimeAPIs) == 0 && len(apisRes.BatchAPIs) == 0 && len(apisRes.TrafficSplitters) == 0 { + var allRealtimeAPIs []schema.APIResponse + var allBatchAPIs []schema.APIResponse + var allTrafficSplitters []schema.APIResponse + + for _, api := range apisRes { + switch api.Spec.Kind { + case userconfig.BatchAPIKind: + allBatchAPIs = append(allBatchAPIs, api) + case userconfig.RealtimeAPIKind: + allRealtimeAPIs = append(allRealtimeAPIs, api) + case userconfig.TrafficSplitterKind: + allTrafficSplitters = append(allTrafficSplitters, api) + } + } + + if len(allRealtimeAPIs) == 0 && len(allBatchAPIs) == 0 && len(allTrafficSplitters) == 0 { mismatchedAPIMessage, err := getLocalVersionMismatchedAPIsMessage() if err == nil && len(mismatchedAPIMessage) > 0 { return console.Bold("no apis are deployed") + "\n\n" + mismatchedAPIMessage, nil @@ -353,28 +371,28 @@ func getAPIsByEnv(env cliconfig.Environment, printEnv bool) (string, error) { out := "" - if len(apisRes.BatchAPIs) > 0 { + if len(allBatchAPIs) > 0 { envNames := []string{} - for range apisRes.BatchAPIs { + for range allBatchAPIs { envNames = append(envNames, env.Name) } - t := batchAPIsTable(apisRes.BatchAPIs, envNames) + t := batchAPIsTable(allBatchAPIs, envNames) t.FindHeaderByTitle(_titleEnvironment).Hidden = true out += t.MustFormat() } - if len(apisRes.RealtimeAPIs) > 0 { + if len(allRealtimeAPIs) > 0 { envNames := []string{} - for range apisRes.RealtimeAPIs { + for range allRealtimeAPIs { envNames = append(envNames, env.Name) } - t := realtimeAPIsTable(apisRes.RealtimeAPIs, envNames) + t := realtimeAPIsTable(allRealtimeAPIs, envNames) t.FindHeaderByTitle(_titleEnvironment).Hidden = true - if len(apisRes.BatchAPIs) > 0 { + if len(allBatchAPIs) > 0 { out += "\n" } @@ -385,16 +403,16 @@ func getAPIsByEnv(env cliconfig.Environment, printEnv bool) (string, error) { out += t.MustFormat() } - if len(apisRes.TrafficSplitters) > 0 { + if len(allTrafficSplitters) > 0 { envNames := []string{} - for range apisRes.TrafficSplitters { + for range allTrafficSplitters { envNames = append(envNames, env.Name) } - t := trafficSplitterListTable(apisRes.TrafficSplitters, envNames) + t := trafficSplitterListTable(allTrafficSplitters, envNames) t.FindHeaderByTitle(_titleEnvironment).Hidden = true - if len(apisRes.BatchAPIs) > 0 || len(apisRes.RealtimeAPIs) > 0 { + if len(allBatchAPIs) > 0 || len(allRealtimeAPIs) > 0 { out += "\n" } @@ -428,42 +446,44 @@ func getLocalVersionMismatchedAPIsMessage() (string, error) { func getAPI(env cliconfig.Environment, apiName string) (string, error) { if env.Provider == types.AWSProviderType { - apiRes, err := cluster.GetAPI(MustGetOperatorConfig(env.Name), apiName) + apisRes, err := cluster.GetAPI(MustGetOperatorConfig(env.Name), apiName) if err != nil { return "", err } if _flagOutput == flags.JSONOutputType { - bytes, err := libjson.Marshal(apiRes) + bytes, err := libjson.Marshal(apisRes) if err != nil { return "", err } return string(bytes), nil } - if apiRes.RealtimeAPI != nil { - return realtimeAPITable(apiRes.RealtimeAPI, env) + apiRes := apisRes[0] + + if apiRes.Spec.Kind == userconfig.RealtimeAPIKind { + return realtimeAPITable(apiRes, env) } - if apiRes.TrafficSplitter != nil { - return trafficSplitterTable(apiRes.TrafficSplitter, env) + if apiRes.Spec.Kind != userconfig.TrafficSplitterKind { + return trafficSplitterTable(apiRes, env) } - return batchAPITable(*apiRes.BatchAPI), nil + return batchAPITable(apiRes), nil } - apiRes, err := local.GetAPI(apiName) + apisRes, err := local.GetAPI(apiName) if err != nil { return "", err } if _flagOutput == flags.JSONOutputType { - bytes, err := libjson.Marshal(apiRes) + bytes, err := libjson.Marshal(apisRes) if err != nil { return "", err } return string(bytes), nil } - return realtimeAPITable(apiRes.RealtimeAPI, env) + return realtimeAPITable(apisRes[0], env) } func titleStr(title string) string { diff --git a/cli/cmd/lib_batch_apis.go b/cli/cmd/lib_batch_apis.go index 1d557a371b..30b8a9af2e 100644 --- a/cli/cmd/lib_batch_apis.go +++ b/cli/cmd/lib_batch_apis.go @@ -41,7 +41,7 @@ const ( _timeFormat = "02 Jan 2006 15:04:05 MST" ) -func batchAPIsTable(batchAPIs []schema.BatchAPI, envNames []string) table.Table { +func batchAPIsTable(batchAPIs []schema.APIResponse, envNames []string) table.Table { rows := make([][]interface{}, 0, len(batchAPIs)) for i, batchAPI := range batchAPIs { @@ -82,7 +82,7 @@ func batchAPIsTable(batchAPIs []schema.BatchAPI, envNames []string) table.Table } } -func batchAPITable(batchAPI schema.BatchAPI) string { +func batchAPITable(batchAPI schema.APIResponse) string { jobRows := make([][]interface{}, 0, len(batchAPI.JobStatuses)) out := "" diff --git a/cli/cmd/lib_realtime_apis.go b/cli/cmd/lib_realtime_apis.go index b00131c48a..987ddc4b53 100644 --- a/cli/cmd/lib_realtime_apis.go +++ b/cli/cmd/lib_realtime_apis.go @@ -41,10 +41,10 @@ import ( "github.com/cortexlabs/cortex/pkg/types/userconfig" ) -func realtimeAPITable(realtimeAPI *schema.RealtimeAPI, env cliconfig.Environment) (string, error) { +func realtimeAPITable(realtimeAPI schema.APIResponse, env cliconfig.Environment) (string, error) { var out string - t := realtimeAPIsTable([]schema.RealtimeAPI{*realtimeAPI}, []string{env.Name}) + t := realtimeAPIsTable([]schema.APIResponse{realtimeAPI}, []string{env.Name}) t.FindHeaderByTitle(_titleEnvironment).Hidden = true t.FindHeaderByTitle(_titleRealtimeAPI).Hidden = true if env.Provider == types.LocalProviderType { @@ -56,14 +56,14 @@ func realtimeAPITable(realtimeAPI *schema.RealtimeAPI, env cliconfig.Environment if env.Provider != types.LocalProviderType && realtimeAPI.Spec.Monitoring != nil { switch realtimeAPI.Spec.Monitoring.ModelType { case userconfig.ClassificationModelType: - out += "\n" + classificationMetricsStr(&realtimeAPI.Metrics) + out += "\n" + classificationMetricsStr(realtimeAPI.Metrics) case userconfig.RegressionModelType: - out += "\n" + regressionMetricsStr(&realtimeAPI.Metrics) + out += "\n" + regressionMetricsStr(realtimeAPI.Metrics) } } - if realtimeAPI.DashboardURL != "" { - out += "\n" + console.Bold("metrics dashboard: ") + realtimeAPI.DashboardURL + "\n" + if realtimeAPI.DashboardURL != nil { + out += "\n" + console.Bold("metrics dashboard: ") + *realtimeAPI.DashboardURL + "\n" } out += "\n" + console.Bold("endpoint: ") + realtimeAPI.Endpoint @@ -71,7 +71,7 @@ func realtimeAPITable(realtimeAPI *schema.RealtimeAPI, env cliconfig.Environment out += fmt.Sprintf("\n%s curl %s -X POST -H \"Content-Type: application/json\" -d @sample.json\n", console.Bold("example curl:"), realtimeAPI.Endpoint) if realtimeAPI.Spec.Predictor.Type == userconfig.TensorFlowPredictorType || realtimeAPI.Spec.Predictor.Type == userconfig.ONNXPredictorType { - out += "\n" + describeModelInput(&realtimeAPI.Status, realtimeAPI.Endpoint) + out += "\n" + describeModelInput(realtimeAPI.Status, realtimeAPI.Endpoint) } out += titleStr("configuration") + strings.TrimSpace(realtimeAPI.Spec.UserStr(env.Provider)) @@ -79,7 +79,7 @@ func realtimeAPITable(realtimeAPI *schema.RealtimeAPI, env cliconfig.Environment return out, nil } -func realtimeAPIsTable(realtimeAPIs []schema.RealtimeAPI, envNames []string) table.Table { +func realtimeAPIsTable(realtimeAPIs []schema.APIResponse, envNames []string) table.Table { rows := make([][]interface{}, 0, len(realtimeAPIs)) var totalFailed int32 @@ -98,10 +98,10 @@ func realtimeAPIsTable(realtimeAPIs []schema.RealtimeAPI, envNames []string) tab realtimeAPI.Status.Requested, realtimeAPI.Status.Updated.TotalFailed(), libtime.SinceStr(&lastUpdated), - latencyStr(&realtimeAPI.Metrics), - code2XXStr(&realtimeAPI.Metrics), - code4XXStr(&realtimeAPI.Metrics), - code5XXStr(&realtimeAPI.Metrics), + latencyStr(realtimeAPI.Metrics), + code2XXStr(realtimeAPI.Metrics), + code4XXStr(realtimeAPI.Metrics), + code5XXStr(realtimeAPI.Metrics), }) totalFailed += realtimeAPI.Status.Updated.TotalFailed() diff --git a/cli/cmd/lib_traffic_splitters.go b/cli/cmd/lib_traffic_splitters.go index 276eee55da..16187e91bf 100644 --- a/cli/cmd/lib_traffic_splitters.go +++ b/cli/cmd/lib_traffic_splitters.go @@ -36,12 +36,12 @@ const ( _titleAPIs = "apis" ) -func trafficSplitterTable(trafficSplitter *schema.TrafficSplitter, env cliconfig.Environment) (string, error) { +func trafficSplitterTable(trafficSplitter schema.APIResponse, env cliconfig.Environment) (string, error) { var out string lastUpdated := time.Unix(trafficSplitter.Spec.LastUpdated, 0) - t, err := trafficSplitTable(*trafficSplitter, env) + t, err := trafficSplitTable(trafficSplitter, env) if err != nil { return "", err } @@ -58,25 +58,27 @@ func trafficSplitterTable(trafficSplitter *schema.TrafficSplitter, env cliconfig return out, nil } -func trafficSplitTable(trafficSplitter schema.TrafficSplitter, env cliconfig.Environment) (table.Table, error) { +func trafficSplitTable(trafficSplitter schema.APIResponse, env cliconfig.Environment) (table.Table, error) { rows := make([][]interface{}, 0, len(trafficSplitter.Spec.APIs)) for _, api := range trafficSplitter.Spec.APIs { - apiRes, err := cluster.GetAPI(MustGetOperatorConfig(env.Name), api.Name) + apisRes, err := cluster.GetAPI(MustGetOperatorConfig(env.Name), api.Name) if err != nil { return table.Table{}, err } - lastUpdated := time.Unix(apiRes.RealtimeAPI.Spec.LastUpdated, 0) + + apiRes := apisRes[0] + lastUpdated := time.Unix(apiRes.Spec.LastUpdated, 0) rows = append(rows, []interface{}{ env.Name, - apiRes.RealtimeAPI.Spec.Name, + apiRes.Spec.Name, api.Weight, - apiRes.RealtimeAPI.Status.Message(), - apiRes.RealtimeAPI.Status.Requested, + apiRes.Status.Message(), + apiRes.Status.Requested, libtime.SinceStr(&lastUpdated), - latencyStr(&apiRes.RealtimeAPI.Metrics), - code2XXStr(&apiRes.RealtimeAPI.Metrics), - code5XXStr(&apiRes.RealtimeAPI.Metrics), + latencyStr(apiRes.Metrics), + code2XXStr(apiRes.Metrics), + code5XXStr(apiRes.Metrics), }) } @@ -96,7 +98,7 @@ func trafficSplitTable(trafficSplitter schema.TrafficSplitter, env cliconfig.Env }, nil } -func trafficSplitterListTable(trafficSplitter []schema.TrafficSplitter, envNames []string) table.Table { +func trafficSplitterListTable(trafficSplitter []schema.APIResponse, envNames []string) table.Table { rows := make([][]interface{}, 0, len(trafficSplitter)) for i, splitAPI := range trafficSplitter { lastUpdated := time.Unix(splitAPI.Spec.LastUpdated, 0) diff --git a/cli/cmd/predict.go b/cli/cmd/predict.go index 52c679e68a..31721bbe0b 100644 --- a/cli/cmd/predict.go +++ b/cli/cmd/predict.go @@ -30,6 +30,7 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/telemetry" "github.com/cortexlabs/cortex/pkg/operator/schema" "github.com/cortexlabs/cortex/pkg/types" + "github.com/cortexlabs/cortex/pkg/types/userconfig" "github.com/spf13/cobra" ) @@ -62,31 +63,31 @@ var _predictCmd = &cobra.Command{ apiName := args[0] jsonPath := args[1] - var apiRes schema.GetAPIResponse + var apisRes []schema.APIResponse if env.Provider == types.AWSProviderType { - apiRes, err = cluster.GetAPI(MustGetOperatorConfig(env.Name), apiName) + apisRes, err = cluster.GetAPI(MustGetOperatorConfig(env.Name), apiName) if err != nil { exit.Error(err) } } else { - apiRes, err = local.GetAPI(apiName) + apisRes, err = local.GetAPI(apiName) if err != nil { exit.Error(err) } } - if apiRes.RealtimeAPI == nil { + apiRes := apisRes[0] + + if apiRes.Spec.Kind != userconfig.RealtimeAPIKind { exit.Error(errors.ErrorUnexpected("unable to get api", apiName)) // unexpected } - realtimeAPI := apiRes.RealtimeAPI - - totalReady := realtimeAPI.Status.Updated.Ready + realtimeAPI.Status.Stale.Ready + totalReady := apiRes.Status.Updated.Ready + apiRes.Status.Stale.Ready if totalReady == 0 { - exit.Error(ErrorAPINotReady(apiName, realtimeAPI.Status.Message())) + exit.Error(ErrorAPINotReady(apiName, apiRes.Status.Message())) } - predictResponse, err := makePredictRequest(realtimeAPI.Endpoint, jsonPath) + predictResponse, err := makePredictRequest(apiRes.Endpoint, jsonPath) if err != nil { exit.Error(err) } diff --git a/cli/cmd/refresh.go b/cli/cmd/refresh.go index ad828aa04f..b08b02d288 100644 --- a/cli/cmd/refresh.go +++ b/cli/cmd/refresh.go @@ -22,6 +22,7 @@ import ( "github.com/cortexlabs/cortex/cli/cluster" "github.com/cortexlabs/cortex/cli/types/flags" + "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/exit" libjson "github.com/cortexlabs/cortex/pkg/lib/json" "github.com/cortexlabs/cortex/pkg/lib/print" @@ -60,8 +61,7 @@ var _refreshCmd = &cobra.Command{ } if env.Provider == types.LocalProviderType { - print.BoldFirstLine("`cortex refresh` is not supported in the local environment; use `cortex deploy` instead") - return + exit.Error(errors.Append(ErrorNotSupportedInLocalEnvironment(), "; use `cortex deploy` instead")) } refreshResponse, err := cluster.Refresh(MustGetOperatorConfig(env.Name), args[0], _flagRefreshForce) if err != nil { diff --git a/cli/local/api.go b/cli/local/api.go index a784f663c6..6ed612dbe3 100644 --- a/cli/local/api.go +++ b/cli/local/api.go @@ -28,13 +28,14 @@ import ( "github.com/cortexlabs/cortex/pkg/lib/files" "github.com/cortexlabs/cortex/pkg/lib/prompt" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" + "github.com/cortexlabs/cortex/pkg/operator/schema" "github.com/cortexlabs/cortex/pkg/types/spec" "github.com/cortexlabs/cortex/pkg/types/userconfig" ) var _deploymentID = "local" -func UpdateAPI(apiConfig *userconfig.API, configPath string, projectID string, deployDisallowPrompt bool, awsClient *aws.Client) (*spec.API, string, error) { +func UpdateAPI(apiConfig *userconfig.API, configPath string, projectID string, deployDisallowPrompt bool, awsClient *aws.Client) (*schema.APIResponse, string, error) { var incompatibleVersion string encounteredVersionMismatch := false prevAPISpec, err := FindAPISpec(apiConfig.Name) @@ -84,7 +85,7 @@ func UpdateAPI(apiConfig *userconfig.API, configPath string, projectID string, d newAPISpec.LocalProjectDir = files.Dir(configPath) if areAPIsEqual(newAPISpec, prevAPISpec) { - return newAPISpec, fmt.Sprintf("%s is up to date", newAPISpec.Resource.UserString()), nil + return toAPIResponse(newAPISpec), fmt.Sprintf("%s is up to date", newAPISpec.Resource.UserString()), nil } if prevAPISpec != nil || len(prevAPIContainers) != 0 { @@ -112,16 +113,24 @@ func UpdateAPI(apiConfig *userconfig.API, configPath string, projectID string, d if prevAPISpec == nil && len(prevAPIContainers) == 0 { if encounteredVersionMismatch { - return newAPISpec, fmt.Sprintf( + return toAPIResponse(newAPISpec), fmt.Sprintf( "creating api %s with current CLI version %s", newAPISpec.Name, consts.CortexVersion, ), nil } - return newAPISpec, fmt.Sprintf("creating %s", newAPISpec.Resource.UserString()), nil + + return toAPIResponse(newAPISpec), fmt.Sprintf("creating %s", newAPISpec.Resource.UserString()), nil } - return newAPISpec, fmt.Sprintf("updating %s", newAPISpec.Resource.UserString()), nil + return toAPIResponse(newAPISpec), fmt.Sprintf("updating %s", newAPISpec.Resource.UserString()), nil +} + +func toAPIResponse(api *spec.API) *schema.APIResponse { + return &schema.APIResponse{ + Spec: *api, + Endpoint: fmt.Sprintf("http://localhost:%d", api.Networking.LocalPort), + } } func writeAPISpec(apiSpec *spec.API) error { diff --git a/cli/local/deploy.go b/cli/local/deploy.go index 467a305d98..fb0dada6f1 100644 --- a/cli/local/deploy.go +++ b/cli/local/deploy.go @@ -31,55 +31,56 @@ import ( "github.com/cortexlabs/cortex/pkg/types/spec" ) -func Deploy(env cliconfig.Environment, configPath string, projectFileList []string, deployDisallowPrompt bool) (schema.DeployResponse, error) { +func Deploy(env cliconfig.Environment, configPath string, projectFileList []string, deployDisallowPrompt bool) ([]schema.DeployResult, error) { configFileName := filepath.Base(configPath) _, err := docker.GetDockerClient() if err != nil { - return schema.DeployResponse{}, err + return nil, err } configBytes, err := files.ReadFileBytes(configPath) if err != nil { - return schema.DeployResponse{}, err + return nil, err } projectFiles, err := newProjectFiles(projectFileList, configPath) if err != nil { - return schema.DeployResponse{}, err + return nil, err } var awsClient *aws.Client if env.AWSAccessKeyID != nil { awsClient, err = aws.NewFromCreds(*env.AWSRegion, *env.AWSAccessKeyID, *env.AWSSecretAccessKey) if err != nil { - return schema.DeployResponse{}, err + return nil, err } } else { awsClient, err = aws.NewAnonymousClient() if err != nil { - return schema.DeployResponse{}, err + return nil, err } } apiConfigs, err := spec.ExtractAPIConfigs(configBytes, types.LocalProviderType, configFileName, nil) if err != nil { - return schema.DeployResponse{}, err + return nil, err } err = ValidateLocalAPIs(apiConfigs, projectFiles, awsClient) if err != nil { err = errors.Append(err, fmt.Sprintf("\n\napi configuration schema for Realtime API can be found at https://docs.cortex.dev/v/%s/deployments/realtime-api/api-configuration", consts.CortexVersionMinor)) - return schema.DeployResponse{}, err + return nil, err } projectID, err := files.HashFile(projectFileList[0], projectFileList[1:]...) if err != nil { - return schema.DeployResponse{}, errors.Wrap(err, "failed to hash directory", filepath.Dir(configPath)) + return nil, errors.Wrap(err, "failed to hash directory", filepath.Dir(configPath)) } results := make([]schema.DeployResult, len(apiConfigs)) - for i, apiConfig := range apiConfigs { + for i := range apiConfigs { + apiConfig := apiConfigs[i] api, msg, err := UpdateAPI(&apiConfig, configPath, projectID, deployDisallowPrompt, awsClient) results[i].Message = msg if err != nil { @@ -89,7 +90,5 @@ func Deploy(env cliconfig.Environment, configPath string, projectFileList []stri } } - return schema.DeployResponse{ - Results: results, - }, nil + return results, nil } diff --git a/cli/local/get.go b/cli/local/get.go index 80ba3e971a..58cf433307 100644 --- a/cli/local/get.go +++ b/cli/local/get.go @@ -30,39 +30,40 @@ import ( "github.com/cortexlabs/cortex/pkg/types/spec" ) -func GetAPIs() (schema.GetAPIsResponse, error) { +func GetAPIs() ([]schema.APIResponse, error) { _, err := docker.GetDockerClient() if err != nil { - return schema.GetAPIsResponse{}, err + return nil, err } apiSpecList, err := ListAPISpecs() if err != nil { - return schema.GetAPIsResponse{}, err + return nil, err } - realtimeAPIs := make([]schema.RealtimeAPI, len(apiSpecList)) + apiResponses := make([]schema.APIResponse, len(apiSpecList)) for i, apiSpec := range apiSpecList { apiStatus, err := GetAPIStatus(&apiSpec) if err != nil { - return schema.GetAPIsResponse{}, err + return nil, err } metrics, err := GetAPIMetrics(&apiSpec) if err != nil { - return schema.GetAPIsResponse{}, err + return nil, err } - realtimeAPIs[i] = schema.RealtimeAPI{ - Spec: apiSpec, - Status: apiStatus, - Metrics: metrics, + apiPort := apiSpec.Networking.LocalPort + + apiResponses[i] = schema.APIResponse{ + Spec: apiSpec, + Status: &apiStatus, + Metrics: &metrics, + Endpoint: fmt.Sprintf("http://localhost:%d", *apiPort), } } - return schema.GetAPIsResponse{ - RealtimeAPIs: realtimeAPIs, - }, nil + return apiResponses, nil } func ListAPISpecs() ([]spec.API, error) { @@ -127,34 +128,34 @@ func ListVersionMismatchedAPIs() ([]string, error) { return apiNames, nil } -func GetAPI(apiName string) (schema.GetAPIResponse, error) { +func GetAPI(apiName string) ([]schema.APIResponse, error) { _, err := docker.GetDockerClient() if err != nil { - return schema.GetAPIResponse{}, err + return nil, err } apiSpec, err := FindAPISpec(apiName) if err != nil { - return schema.GetAPIResponse{}, err + return nil, err } apiStatus, err := GetAPIStatus(apiSpec) if err != nil { - return schema.GetAPIResponse{}, err + return nil, err } apiMetrics, err := GetAPIMetrics(apiSpec) if err != nil { - return schema.GetAPIResponse{}, err + return nil, err } containers, err := GetContainersByAPI(apiName) if err != nil { - return schema.GetAPIResponse{}, err + return nil, err } if len(containers) == 0 { - return schema.GetAPIResponse{}, ErrorAPIContainersNotFound(apiName) + return nil, ErrorAPIContainersNotFound(apiName) } apiContainer := containers[0] if len(containers) == 2 && apiContainer.Labels["type"] != "api" { @@ -163,11 +164,11 @@ func GetAPI(apiName string) (schema.GetAPIResponse, error) { apiPort := apiSpec.Networking.LocalPort - return schema.GetAPIResponse{ - RealtimeAPI: &schema.RealtimeAPI{ + return []schema.APIResponse{ + { Spec: *apiSpec, - Status: apiStatus, - Metrics: apiMetrics, + Status: &apiStatus, + Metrics: &apiMetrics, Endpoint: fmt.Sprintf("http://localhost:%d", *apiPort), }, }, nil diff --git a/pkg/operator/resources/batchapi/api.go b/pkg/operator/resources/batchapi/api.go index ba4d31be71..df5eae2c9c 100644 --- a/pkg/operator/resources/batchapi/api.go +++ b/pkg/operator/resources/batchapi/api.go @@ -169,8 +169,8 @@ func deleteS3Resources(apiName string) error { } // Returns all batch apis, for each API returning the most recently submitted job and all running jobs -func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, k8sJobs []kbatch.Job, pods []kcore.Pod) ([]schema.BatchAPI, error) { - batchAPIsMap := map[string]*schema.BatchAPI{} +func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, k8sJobs []kbatch.Job, pods []kcore.Pod) ([]schema.APIResponse, error) { + batchAPIsMap := map[string]*schema.APIResponse{} jobIDToK8sJobMap := map[string]*kbatch.Job{} for _, job := range k8sJobs { @@ -210,7 +210,7 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, k8sJobs jobStatuses = append(jobStatuses, *jobStatus) } - batchAPIsMap[apiName] = &schema.BatchAPI{ + batchAPIsMap[apiName] = &schema.APIResponse{ Spec: *api, Endpoint: endpoint, JobStatuses: jobStatuses, @@ -245,7 +245,7 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, k8sJobs } } - batchAPIList := make([]schema.BatchAPI, 0, len(batchAPIsMap)) + batchAPIList := make([]schema.APIResponse, 0, len(batchAPIsMap)) for _, batchAPI := range batchAPIsMap { batchAPIList = append(batchAPIList, *batchAPI) @@ -254,18 +254,18 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, k8sJobs return batchAPIList, nil } -func GetAPIByName(deployedResource *operator.DeployedResource) (*schema.GetAPIResponse, error) { +func GetAPIByName(deployedResource *operator.DeployedResource) (schema.APIResponse, error) { virtualService := deployedResource.VirtualService apiID := virtualService.Labels["apiID"] api, err := operator.DownloadAPISpec(deployedResource.Name, apiID) if err != nil { - return nil, err + return schema.APIResponse{}, err } k8sJobs, err := config.K8s.ListJobsByLabel("apiName", deployedResource.Name) if err != nil { - return nil, err + return schema.APIResponse{}, err } jobIDToK8sJobMap := map[string]*kbatch.Job{} @@ -275,12 +275,12 @@ func GetAPIByName(deployedResource *operator.DeployedResource) (*schema.GetAPIRe endpoint, err := operator.APIEndpoint(api) if err != nil { - return nil, err + return schema.APIResponse{}, err } pods, err := config.K8s.ListPodsByLabel("apiName", deployedResource.Name) if err != nil { - return nil, err + return schema.APIResponse{}, err } jobIDToPodsMap := map[string][]kcore.Pod{} @@ -290,7 +290,7 @@ func GetAPIByName(deployedResource *operator.DeployedResource) (*schema.GetAPIRe inProgressJobKeys, err := listAllInProgressJobKeysByAPI(deployedResource.Name) if err != nil { - return nil, err + return schema.APIResponse{}, err } jobStatuses := []status.JobStatus{} @@ -298,7 +298,7 @@ func GetAPIByName(deployedResource *operator.DeployedResource) (*schema.GetAPIRe for _, jobKey := range inProgressJobKeys { jobStatus, err := getJobStatusFromK8sJob(jobKey, jobIDToK8sJobMap[jobKey.ID], jobIDToPodsMap[jobKey.ID]) if err != nil { - return nil, err + return schema.APIResponse{}, err } jobStatuses = append(jobStatuses, *jobStatus) @@ -308,7 +308,7 @@ func GetAPIByName(deployedResource *operator.DeployedResource) (*schema.GetAPIRe if len(jobStatuses) < 10 { jobStates, err := getMostRecentlySubmittedJobStates(deployedResource.Name, 10+len(jobStatuses)) if err != nil { - return nil, err + return schema.APIResponse{}, err } for _, jobState := range jobStates { if jobIDSet.Has(jobState.ID) { @@ -318,7 +318,7 @@ func GetAPIByName(deployedResource *operator.DeployedResource) (*schema.GetAPIRe jobStatus, err := getJobStatusFromJobState(jobState, nil, nil) if err != nil { - return nil, err + return schema.APIResponse{}, err } jobStatuses = append(jobStatuses, *jobStatus) @@ -328,11 +328,9 @@ func GetAPIByName(deployedResource *operator.DeployedResource) (*schema.GetAPIRe } } - return &schema.GetAPIResponse{ - BatchAPI: &schema.BatchAPI{ - Spec: *api, - JobStatuses: jobStatuses, - Endpoint: endpoint, - }, + return schema.APIResponse{ + Spec: *api, + JobStatuses: jobStatuses, + Endpoint: endpoint, }, nil } diff --git a/pkg/operator/resources/realtimeapi/api.go b/pkg/operator/resources/realtimeapi/api.go index 02f3b12b3c..cb3e12ccb6 100644 --- a/pkg/operator/resources/realtimeapi/api.go +++ b/pkg/operator/resources/realtimeapi/api.go @@ -223,7 +223,7 @@ func DeleteAPI(apiName string, keepCache bool) error { return nil } -func GetAllAPIs(pods []kcore.Pod, deployments []kapps.Deployment) ([]schema.RealtimeAPI, error) { +func GetAllAPIs(pods []kcore.Pod, deployments []kapps.Deployment) ([]schema.APIResponse, error) { statuses, err := GetAllStatuses(deployments, pods) if err != nil { return nil, err @@ -240,7 +240,7 @@ func GetAllAPIs(pods []kcore.Pod, deployments []kapps.Deployment) ([]schema.Real return nil, err } - realtimeAPIs := make([]schema.RealtimeAPI, len(apis)) + realtimeAPIs := make([]schema.APIResponse, len(apis)) for i, api := range apis { endpoint, err := operator.APIEndpoint(&api) @@ -248,10 +248,10 @@ func GetAllAPIs(pods []kcore.Pod, deployments []kapps.Deployment) ([]schema.Real return nil, err } - realtimeAPIs[i] = schema.RealtimeAPI{ + realtimeAPIs[i] = schema.APIResponse{ Spec: api, - Status: statuses[i], - Metrics: allMetrics[i], + Status: &statuses[i], + Metrics: &allMetrics[i], Endpoint: endpoint, } } @@ -271,35 +271,35 @@ func namesAndIDsFromStatuses(statuses []status.Status) ([]string, []string) { return apiNames, apiIDs } -func GetAPIByName(deployedResource *operator.DeployedResource) (*schema.GetAPIResponse, error) { +func GetAPIByName(deployedResource *operator.DeployedResource) (schema.APIResponse, error) { status, err := GetStatus(deployedResource.Name) if err != nil { - return nil, err + return schema.APIResponse{}, err } api, err := operator.DownloadAPISpec(status.APIName, status.APIID) if err != nil { - return nil, err + return schema.APIResponse{}, err } metrics, err := GetMetrics(api) if err != nil { - return nil, err + return schema.APIResponse{}, err } apiEndpoint, err := operator.APIEndpoint(api) if err != nil { - return nil, err + return schema.APIResponse{}, err } - return &schema.GetAPIResponse{ - RealtimeAPI: &schema.RealtimeAPI{ - Spec: *api, - Status: *status, - Metrics: *metrics, - Endpoint: apiEndpoint, - DashboardURL: DashboardURL(), - }, + dashboardURL := DashboardURL() + + return schema.APIResponse{ + Spec: *api, + Status: status, + Metrics: metrics, + Endpoint: apiEndpoint, + DashboardURL: &dashboardURL, }, nil } diff --git a/pkg/operator/resources/resources.go b/pkg/operator/resources/resources.go index bd50d39b28..4d5907c869 100644 --- a/pkg/operator/resources/resources.go +++ b/pkg/operator/resources/resources.go @@ -73,7 +73,7 @@ func GetDeployedResourceByNameOrNil(resourceName string) (*operator.DeployedReso }, nil } -func Deploy(projectBytes []byte, configFileName string, configBytes []byte, force bool) (*schema.DeployResponse, error) { +func Deploy(projectBytes []byte, configFileName string, configBytes []byte, force bool) ([]schema.DeployResult, error) { projectID := hash.Bytes(projectBytes) projectKey := spec.ProjectKey(projectID, config.Cluster.ClusterName) projectFileMap, err := archive.UnzipMemToMem(projectBytes) @@ -110,23 +110,27 @@ func Deploy(projectBytes []byte, configFileName string, configBytes []byte, forc // This is done if user specifies RealtimeAPIs in same file as TrafficSplitter apiConfigs = append(ExclusiveFilterAPIsByKind(apiConfigs, userconfig.TrafficSplitterKind), InclusiveFilterAPIsByKind(apiConfigs, userconfig.TrafficSplitterKind)...) - results := make([]schema.DeployResult, len(apiConfigs)) - for i, apiConfig := range apiConfigs { + results := make([]schema.DeployResult, 0, len(apiConfigs)) + for i := range apiConfigs { + apiConfig := apiConfigs[i] api, msg, err := UpdateAPI(&apiConfig, projectID, force) - results[i].Message = msg + + result := schema.DeployResult{ + Message: msg, + API: api, + } + if err != nil { - results[i].Error = errors.ErrorStr(err) - } else { - results[i].API = api + result.Error = errors.ErrorStr(err) } + + results = append(results, result) } - return &schema.DeployResponse{ - Results: results, - }, nil + return results, nil } -func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.API, string, error) { +func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*schema.APIResponse, string, error) { deployedResource, err := GetDeployedResourceByNameOrNil(apiConfig.Name) if err != nil { return nil, "", err @@ -136,16 +140,30 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A return nil, "", ErrorCannotChangeKindOfDeployedAPI(apiConfig.Name, apiConfig.Kind, deployedResource.Kind) } + var api *spec.API + var msg string switch apiConfig.Kind { case userconfig.RealtimeAPIKind: - return realtimeapi.UpdateAPI(apiConfig, projectID, force) + api, msg, err = realtimeapi.UpdateAPI(apiConfig, projectID, force) case userconfig.BatchAPIKind: - return batchapi.UpdateAPI(apiConfig, projectID) + api, msg, err = batchapi.UpdateAPI(apiConfig, projectID) case userconfig.TrafficSplitterKind: - return trafficsplitter.UpdateAPI(apiConfig, force) + api, msg, err = trafficsplitter.UpdateAPI(apiConfig, force) default: return nil, "", ErrorOperationIsOnlySupportedForKind(*deployedResource, userconfig.RealtimeAPIKind, userconfig.BatchAPIKind, userconfig.TrafficSplitterKind) // unexpected } + + if err == nil && api != nil { + apiEndpoint, _ := operator.APIEndpoint(api) + + apiSpec := *api + + return &schema.APIResponse{ + Spec: apiSpec, + Endpoint: apiEndpoint, + }, msg, err + } + return nil, msg, err } func RefreshAPI(apiName string, force bool) (string, error) { @@ -217,7 +235,7 @@ func DeleteAPI(apiName string, keepCache bool) (*schema.DeleteResponse, error) { }, nil } -func GetAPIs() (*schema.GetAPIsResponse, error) { +func GetAPIs() ([]schema.APIResponse, error) { var deployments []kapps.Deployment var k8sJobs []kbatch.Job var pods []kcore.Pod @@ -286,29 +304,40 @@ func GetAPIs() (*schema.GetAPIsResponse, error) { if err != nil { return nil, err } - return &schema.GetAPIsResponse{ - BatchAPIs: batchAPIList, - RealtimeAPIs: realtimeAPIList, - TrafficSplitters: trafficSplitterList, - }, nil + + response := make([]schema.APIResponse, 0, len(realtimeAPIList)+len(batchAPIList)+len(trafficSplitterList)) + + response = append(response, realtimeAPIList...) + response = append(response, batchAPIList...) + response = append(response, trafficSplitterList...) + + return response, nil } -func GetAPI(apiName string) (*schema.GetAPIResponse, error) { +func GetAPI(apiName string) ([]schema.APIResponse, error) { deployedResource, err := GetDeployedResourceByName(apiName) if err != nil { return nil, err } + var api schema.APIResponse + switch deployedResource.Kind { case userconfig.RealtimeAPIKind: - return realtimeapi.GetAPIByName(deployedResource) + api, err = realtimeapi.GetAPIByName(deployedResource) case userconfig.BatchAPIKind: - return batchapi.GetAPIByName(deployedResource) + api, err = batchapi.GetAPIByName(deployedResource) case userconfig.TrafficSplitterKind: - return trafficsplitter.GetAPIByName(deployedResource) + api, err = trafficsplitter.GetAPIByName(deployedResource) default: return nil, ErrorOperationIsOnlySupportedForKind(*deployedResource, userconfig.RealtimeAPIKind, userconfig.BatchAPIKind) // unexpected } + + if err != nil { + return nil, err + } + + return []schema.APIResponse{api}, nil } //checkIfUsedByTrafficSplitter checks if api is used by a deployed TrafficSplitter diff --git a/pkg/operator/resources/trafficsplitter/api.go b/pkg/operator/resources/trafficsplitter/api.go index f0f4cd9420..a02e4b1699 100644 --- a/pkg/operator/resources/trafficsplitter/api.go +++ b/pkg/operator/resources/trafficsplitter/api.go @@ -141,10 +141,10 @@ func getTrafficSplitterDestinations(trafficSplitter *spec.API) []k8s.Destination return destinations } -func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService) ([]schema.TrafficSplitter, error) { +func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService) ([]schema.APIResponse, error) { apiNames := []string{} apiIDs := []string{} - trafficSplitters := []schema.TrafficSplitter{} + trafficSplitters := []schema.APIResponse{} for _, virtualService := range virtualServices { if virtualService.Labels["apiKind"] == userconfig.TrafficSplitterKind.String() { @@ -164,7 +164,7 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService) ([]schem return nil, err } - trafficSplitters = append(trafficSplitters, schema.TrafficSplitter{ + trafficSplitters = append(trafficSplitters, schema.APIResponse{ Spec: trafficSplitter, Endpoint: endpoint, }) @@ -173,22 +173,20 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService) ([]schem return trafficSplitters, nil } -func GetAPIByName(deployedResource *operator.DeployedResource) (*schema.GetAPIResponse, error) { +func GetAPIByName(deployedResource *operator.DeployedResource) (schema.APIResponse, error) { api, err := operator.DownloadAPISpec(deployedResource.Name, deployedResource.VirtualService.Labels["apiID"]) if err != nil { - return nil, err + return schema.APIResponse{}, err } endpoint, err := operator.APIEndpoint(api) if err != nil { - return nil, err + return schema.APIResponse{}, err } - return &schema.GetAPIResponse{ - TrafficSplitter: &schema.TrafficSplitter{ - Spec: *api, - Endpoint: endpoint, - }, + return schema.APIResponse{ + Spec: *api, + Endpoint: endpoint, }, nil } diff --git a/pkg/operator/schema/schema.go b/pkg/operator/schema/schema.go index 653332f7fe..bb05a8281c 100644 --- a/pkg/operator/schema/schema.go +++ b/pkg/operator/schema/schema.go @@ -42,45 +42,19 @@ type NodeInfo struct { ComputeUserRequested userconfig.Compute `json:"compute_user_requested"` // total resources requested by user on a node } -type DeployResponse struct { - Results []DeployResult `json:"results"` -} - type DeployResult struct { - API *spec.API - Message string - Error string -} - -type GetAPIsResponse struct { - RealtimeAPIs []RealtimeAPI `json:"realtime_apis"` - BatchAPIs []BatchAPI `json:"batch_apis"` - TrafficSplitters []TrafficSplitter `json:"traffic_splitters"` -} - -type RealtimeAPI struct { - Spec spec.API `json:"spec"` - Status status.Status `json:"status"` - Metrics metrics.Metrics `json:"metrics"` - Endpoint string `json:"endpoint"` - DashboardURL string `json:"dashboard_url"` -} - -type TrafficSplitter struct { - Spec spec.API `json:"spec"` - Endpoint string `json:"endpoint"` -} - -type GetAPIResponse struct { - RealtimeAPI *RealtimeAPI `json:"realtime_api"` - BatchAPI *BatchAPI `json:"batch_api"` - TrafficSplitter *TrafficSplitter `json:"traffic_splitter"` + API *APIResponse `json:"api"` + Message string `json:"message"` + Error string `json:"error"` } -type BatchAPI struct { - Spec spec.API `json:"spec"` - JobStatuses []status.JobStatus `json:"job_statuses"` - Endpoint string `json:"endpoint"` +type APIResponse struct { + Spec spec.API `json:"spec"` + Status *status.Status `json:"status,omitempty"` + Metrics *metrics.Metrics `json:"metrics,omitempty"` + Endpoint string `json:"endpoint"` + DashboardURL *string `json:"dashboard_url,omitempty"` + JobStatuses []status.JobStatus `json:"job_statuses,omitempty"` } type GetJobResponse struct { From 2faad84270d127bd74a7a981acf4dd020d554d9f Mon Sep 17 00:00:00 2001 From: vishal Date: Wed, 14 Oct 2020 18:35:15 -0400 Subject: [PATCH 2/4] Self review --- cli/cmd/get.go | 12 +++++++- pkg/operator/resources/batchapi/api.go | 28 ++++++++++--------- pkg/operator/resources/realtimeapi/api.go | 24 ++++++++-------- pkg/operator/resources/resources.go | 14 ++-------- pkg/operator/resources/trafficsplitter/api.go | 14 ++++++---- 5 files changed, 50 insertions(+), 42 deletions(-) diff --git a/cli/cmd/get.go b/cli/cmd/get.go index bf4e277ebe..76740ae8ba 100644 --- a/cli/cmd/get.go +++ b/cli/cmd/get.go @@ -459,6 +459,10 @@ func getAPI(env cliconfig.Environment, apiName string) (string, error) { return string(bytes), nil } + if len(apisRes) == 0 { + exit.Error(errors.ErrorUnexpected(fmt.Sprintf("unable to find API %s", apiName))) + } + apiRes := apisRes[0] if apiRes.Spec.Kind == userconfig.RealtimeAPIKind { @@ -483,7 +487,13 @@ func getAPI(env cliconfig.Environment, apiName string) (string, error) { return string(bytes), nil } - return realtimeAPITable(apisRes[0], env) + if len(apisRes) == 0 { + exit.Error(errors.ErrorUnexpected(fmt.Sprintf("unable to find API %s", apiName))) + } + + apiRes := apisRes[0] + + return realtimeAPITable(apiRes, env) } func titleStr(title string) string { diff --git a/pkg/operator/resources/batchapi/api.go b/pkg/operator/resources/batchapi/api.go index df5eae2c9c..49cfe28bd0 100644 --- a/pkg/operator/resources/batchapi/api.go +++ b/pkg/operator/resources/batchapi/api.go @@ -254,18 +254,18 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService, k8sJobs return batchAPIList, nil } -func GetAPIByName(deployedResource *operator.DeployedResource) (schema.APIResponse, error) { +func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { virtualService := deployedResource.VirtualService apiID := virtualService.Labels["apiID"] api, err := operator.DownloadAPISpec(deployedResource.Name, apiID) if err != nil { - return schema.APIResponse{}, err + return nil, err } k8sJobs, err := config.K8s.ListJobsByLabel("apiName", deployedResource.Name) if err != nil { - return schema.APIResponse{}, err + return nil, err } jobIDToK8sJobMap := map[string]*kbatch.Job{} @@ -275,12 +275,12 @@ func GetAPIByName(deployedResource *operator.DeployedResource) (schema.APIRespon endpoint, err := operator.APIEndpoint(api) if err != nil { - return schema.APIResponse{}, err + return nil, err } pods, err := config.K8s.ListPodsByLabel("apiName", deployedResource.Name) if err != nil { - return schema.APIResponse{}, err + return nil, err } jobIDToPodsMap := map[string][]kcore.Pod{} @@ -290,7 +290,7 @@ func GetAPIByName(deployedResource *operator.DeployedResource) (schema.APIRespon inProgressJobKeys, err := listAllInProgressJobKeysByAPI(deployedResource.Name) if err != nil { - return schema.APIResponse{}, err + return nil, err } jobStatuses := []status.JobStatus{} @@ -298,7 +298,7 @@ func GetAPIByName(deployedResource *operator.DeployedResource) (schema.APIRespon for _, jobKey := range inProgressJobKeys { jobStatus, err := getJobStatusFromK8sJob(jobKey, jobIDToK8sJobMap[jobKey.ID], jobIDToPodsMap[jobKey.ID]) if err != nil { - return schema.APIResponse{}, err + return nil, err } jobStatuses = append(jobStatuses, *jobStatus) @@ -308,7 +308,7 @@ func GetAPIByName(deployedResource *operator.DeployedResource) (schema.APIRespon if len(jobStatuses) < 10 { jobStates, err := getMostRecentlySubmittedJobStates(deployedResource.Name, 10+len(jobStatuses)) if err != nil { - return schema.APIResponse{}, err + return nil, err } for _, jobState := range jobStates { if jobIDSet.Has(jobState.ID) { @@ -318,7 +318,7 @@ func GetAPIByName(deployedResource *operator.DeployedResource) (schema.APIRespon jobStatus, err := getJobStatusFromJobState(jobState, nil, nil) if err != nil { - return schema.APIResponse{}, err + return nil, err } jobStatuses = append(jobStatuses, *jobStatus) @@ -328,9 +328,11 @@ func GetAPIByName(deployedResource *operator.DeployedResource) (schema.APIRespon } } - return schema.APIResponse{ - Spec: *api, - JobStatuses: jobStatuses, - Endpoint: endpoint, + return []schema.APIResponse{ + { + Spec: *api, + JobStatuses: jobStatuses, + Endpoint: endpoint, + }, }, nil } diff --git a/pkg/operator/resources/realtimeapi/api.go b/pkg/operator/resources/realtimeapi/api.go index cb3e12ccb6..093a2c5bc7 100644 --- a/pkg/operator/resources/realtimeapi/api.go +++ b/pkg/operator/resources/realtimeapi/api.go @@ -271,35 +271,37 @@ func namesAndIDsFromStatuses(statuses []status.Status) ([]string, []string) { return apiNames, apiIDs } -func GetAPIByName(deployedResource *operator.DeployedResource) (schema.APIResponse, error) { +func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { status, err := GetStatus(deployedResource.Name) if err != nil { - return schema.APIResponse{}, err + return nil, err } api, err := operator.DownloadAPISpec(status.APIName, status.APIID) if err != nil { - return schema.APIResponse{}, err + return nil, err } metrics, err := GetMetrics(api) if err != nil { - return schema.APIResponse{}, err + return nil, err } apiEndpoint, err := operator.APIEndpoint(api) if err != nil { - return schema.APIResponse{}, err + return nil, err } dashboardURL := DashboardURL() - return schema.APIResponse{ - Spec: *api, - Status: status, - Metrics: metrics, - Endpoint: apiEndpoint, - DashboardURL: &dashboardURL, + return []schema.APIResponse{ + { + Spec: *api, + Status: status, + Metrics: metrics, + Endpoint: apiEndpoint, + DashboardURL: &dashboardURL, + }, }, nil } diff --git a/pkg/operator/resources/resources.go b/pkg/operator/resources/resources.go index 4d5907c869..e4bec35264 100644 --- a/pkg/operator/resources/resources.go +++ b/pkg/operator/resources/resources.go @@ -320,24 +320,16 @@ func GetAPI(apiName string) ([]schema.APIResponse, error) { return nil, err } - var api schema.APIResponse - switch deployedResource.Kind { case userconfig.RealtimeAPIKind: - api, err = realtimeapi.GetAPIByName(deployedResource) + return realtimeapi.GetAPIByName(deployedResource) case userconfig.BatchAPIKind: - api, err = batchapi.GetAPIByName(deployedResource) + return batchapi.GetAPIByName(deployedResource) case userconfig.TrafficSplitterKind: - api, err = trafficsplitter.GetAPIByName(deployedResource) + return trafficsplitter.GetAPIByName(deployedResource) default: return nil, ErrorOperationIsOnlySupportedForKind(*deployedResource, userconfig.RealtimeAPIKind, userconfig.BatchAPIKind) // unexpected } - - if err != nil { - return nil, err - } - - return []schema.APIResponse{api}, nil } //checkIfUsedByTrafficSplitter checks if api is used by a deployed TrafficSplitter diff --git a/pkg/operator/resources/trafficsplitter/api.go b/pkg/operator/resources/trafficsplitter/api.go index a02e4b1699..0a644101b8 100644 --- a/pkg/operator/resources/trafficsplitter/api.go +++ b/pkg/operator/resources/trafficsplitter/api.go @@ -173,20 +173,22 @@ func GetAllAPIs(virtualServices []istioclientnetworking.VirtualService) ([]schem return trafficSplitters, nil } -func GetAPIByName(deployedResource *operator.DeployedResource) (schema.APIResponse, error) { +func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { api, err := operator.DownloadAPISpec(deployedResource.Name, deployedResource.VirtualService.Labels["apiID"]) if err != nil { - return schema.APIResponse{}, err + return nil, err } endpoint, err := operator.APIEndpoint(api) if err != nil { - return schema.APIResponse{}, err + return nil, err } - return schema.APIResponse{ - Spec: *api, - Endpoint: endpoint, + return []schema.APIResponse{ + { + Spec: *api, + Endpoint: endpoint, + }, }, nil } From bf373c35e2966d455ceef7adbcb4b7ac7ce1ba12 Mon Sep 17 00:00:00 2001 From: vishal Date: Wed, 14 Oct 2020 18:40:28 -0400 Subject: [PATCH 3/4] More cleanup --- cli/cmd/get.go | 1 - cli/local/get.go | 3 ++- pkg/operator/resources/resources.go | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/cli/cmd/get.go b/cli/cmd/get.go index 76740ae8ba..d7d5fa0d4c 100644 --- a/cli/cmd/get.go +++ b/cli/cmd/get.go @@ -273,7 +273,6 @@ func getAPIsInAllEnvironments() (string, error) { if len(allBatchAPIs) > 0 { out += "\n" - } out += t.MustFormat() diff --git a/cli/local/get.go b/cli/local/get.go index 58cf433307..330e61af3d 100644 --- a/cli/local/get.go +++ b/cli/local/get.go @@ -42,7 +42,8 @@ func GetAPIs() ([]schema.APIResponse, error) { } apiResponses := make([]schema.APIResponse, len(apiSpecList)) - for i, apiSpec := range apiSpecList { + for i := range apiSpecList { + apiSpec := apiSpecList[i] apiStatus, err := GetAPIStatus(&apiSpec) if err != nil { return nil, err diff --git a/pkg/operator/resources/resources.go b/pkg/operator/resources/resources.go index e4bec35264..9098221ed4 100644 --- a/pkg/operator/resources/resources.go +++ b/pkg/operator/resources/resources.go @@ -155,7 +155,6 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*schema if err == nil && api != nil { apiEndpoint, _ := operator.APIEndpoint(api) - apiSpec := *api return &schema.APIResponse{ From d7f256871f1182d39d0c6c1fd849772ee6585e8e Mon Sep 17 00:00:00 2001 From: vishal Date: Wed, 14 Oct 2020 22:14:12 -0400 Subject: [PATCH 4/4] Respond to PR comments --- cli/cluster/get.go | 8 ++++---- cli/cmd/get.go | 11 +++++++---- cli/cmd/predict.go | 4 ++++ cli/local/get.go | 8 ++------ pkg/operator/endpoints/get_job.go | 2 +- pkg/operator/resources/resources.go | 6 +++--- pkg/operator/schema/schema.go | 2 +- 7 files changed, 22 insertions(+), 19 deletions(-) diff --git a/cli/cluster/get.go b/cli/cluster/get.go index 2afc54ab8f..2dee81a23d 100644 --- a/cli/cluster/get.go +++ b/cli/cluster/get.go @@ -51,16 +51,16 @@ func GetAPI(operatorConfig OperatorConfig, apiName string) ([]schema.APIResponse return apiRes, nil } -func GetJob(operatorConfig OperatorConfig, apiName string, jobID string) (schema.GetJobResponse, error) { +func GetJob(operatorConfig OperatorConfig, apiName string, jobID string) (schema.JobResponse, error) { endpoint := path.Join("/batch", apiName, jobID) httpRes, err := HTTPGet(operatorConfig, endpoint) if err != nil { - return schema.GetJobResponse{}, err + return schema.JobResponse{}, err } - var jobRes schema.GetJobResponse + var jobRes schema.JobResponse if err = json.Unmarshal(httpRes, &jobRes); err != nil { - return schema.GetJobResponse{}, errors.Wrap(err, endpoint, string(httpRes)) + return schema.JobResponse{}, errors.Wrap(err, endpoint, string(httpRes)) } return jobRes, nil diff --git a/cli/cmd/get.go b/cli/cmd/get.go index d7d5fa0d4c..d36e1d527f 100644 --- a/cli/cmd/get.go +++ b/cli/cmd/get.go @@ -464,13 +464,16 @@ func getAPI(env cliconfig.Environment, apiName string) (string, error) { apiRes := apisRes[0] - if apiRes.Spec.Kind == userconfig.RealtimeAPIKind { + switch apiRes.Spec.Kind { + case userconfig.RealtimeAPIKind: return realtimeAPITable(apiRes, env) - } - if apiRes.Spec.Kind != userconfig.TrafficSplitterKind { + case userconfig.TrafficSplitterKind: return trafficSplitterTable(apiRes, env) + case userconfig.BatchAPIKind: + return batchAPITable(apiRes), nil + default: + return "", errors.ErrorUnexpected(fmt.Sprintf("encountered unexpected kind %s for api %s", apiRes.Spec.Kind, apiRes.Spec.Name)) } - return batchAPITable(apiRes), nil } apisRes, err := local.GetAPI(apiName) diff --git a/cli/cmd/predict.go b/cli/cmd/predict.go index 31721bbe0b..4c5fb4e32c 100644 --- a/cli/cmd/predict.go +++ b/cli/cmd/predict.go @@ -76,6 +76,10 @@ var _predictCmd = &cobra.Command{ } } + if len(apisRes) == 0 { + exit.Error(errors.ErrorUnexpected(fmt.Sprintf("unable to find API %s", apiName))) + } + apiRes := apisRes[0] if apiRes.Spec.Kind != userconfig.RealtimeAPIKind { diff --git a/cli/local/get.go b/cli/local/get.go index 330e61af3d..21f9875c2f 100644 --- a/cli/local/get.go +++ b/cli/local/get.go @@ -54,13 +54,11 @@ func GetAPIs() ([]schema.APIResponse, error) { return nil, err } - apiPort := apiSpec.Networking.LocalPort - apiResponses[i] = schema.APIResponse{ Spec: apiSpec, Status: &apiStatus, Metrics: &metrics, - Endpoint: fmt.Sprintf("http://localhost:%d", *apiPort), + Endpoint: fmt.Sprintf("http://localhost:%d", *apiSpec.Networking.LocalPort), } } @@ -163,14 +161,12 @@ func GetAPI(apiName string) ([]schema.APIResponse, error) { apiContainer = containers[1] } - apiPort := apiSpec.Networking.LocalPort - return []schema.APIResponse{ { Spec: *apiSpec, Status: &apiStatus, Metrics: &apiMetrics, - Endpoint: fmt.Sprintf("http://localhost:%d", *apiPort), + Endpoint: fmt.Sprintf("http://localhost:%d", *apiSpec.Networking.LocalPort), }, }, nil } diff --git a/pkg/operator/endpoints/get_job.go b/pkg/operator/endpoints/get_job.go index 62f8dfa950..867e2232cf 100644 --- a/pkg/operator/endpoints/get_job.go +++ b/pkg/operator/endpoints/get_job.go @@ -64,7 +64,7 @@ func GetJob(w http.ResponseWriter, r *http.Request) { return } - response := schema.GetJobResponse{ + response := schema.JobResponse{ JobStatus: *jobStatus, APISpec: *spec, Endpoint: urls.Join(endpoint, jobKey.ID), diff --git a/pkg/operator/resources/resources.go b/pkg/operator/resources/resources.go index 9098221ed4..c22cc80eb9 100644 --- a/pkg/operator/resources/resources.go +++ b/pkg/operator/resources/resources.go @@ -155,13 +155,13 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*schema if err == nil && api != nil { apiEndpoint, _ := operator.APIEndpoint(api) - apiSpec := *api return &schema.APIResponse{ - Spec: apiSpec, + Spec: *api, Endpoint: apiEndpoint, - }, msg, err + }, msg, nil } + return nil, msg, err } diff --git a/pkg/operator/schema/schema.go b/pkg/operator/schema/schema.go index bb05a8281c..406cc8812b 100644 --- a/pkg/operator/schema/schema.go +++ b/pkg/operator/schema/schema.go @@ -57,7 +57,7 @@ type APIResponse struct { JobStatuses []status.JobStatus `json:"job_statuses,omitempty"` } -type GetJobResponse struct { +type JobResponse struct { APISpec spec.API `json:"api_spec"` JobStatus status.JobStatus `json:"job_status"` Endpoint string `json:"endpoint"`