diff --git a/cli/cluster/patch.go b/cli/cluster/patch.go new file mode 100644 index 0000000000..92ebf111ab --- /dev/null +++ b/cli/cluster/patch.go @@ -0,0 +1,51 @@ +/* +Copyright 2020 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "path/filepath" + + "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/files" + "github.com/cortexlabs/cortex/pkg/lib/json" + s "github.com/cortexlabs/cortex/pkg/lib/strings" + "github.com/cortexlabs/cortex/pkg/operator/schema" +) + +func Patch(operatorConfig OperatorConfig, configPath string, force bool) ([]schema.DeployResult, error) { + params := map[string]string{ + "force": s.Bool(force), + "configFileName": filepath.Base(configPath), + } + + configBytes, err := files.ReadFileBytes(configPath) + if err != nil { + return nil, err + } + + response, err := HTTPPostJSON(operatorConfig, "/patch", configBytes, params) + if err != nil { + return nil, err + } + + var deployResults []schema.DeployResult + if err := json.Unmarshal(response, &deployResults); err != nil { + return nil, errors.Wrap(err, "/patch", string(response)) + } + + return deployResults, nil +} diff --git a/cli/cmd/cluster.go b/cli/cmd/cluster.go index 2c849ed12d..0d23ee33d8 100644 --- a/cli/cmd/cluster.go +++ b/cli/cmd/cluster.go @@ -46,6 +46,7 @@ import ( "github.com/cortexlabs/cortex/pkg/types/clusterconfig" "github.com/cortexlabs/cortex/pkg/types/clusterstate" "github.com/cortexlabs/cortex/pkg/types/userconfig" + "github.com/cortexlabs/yaml" "github.com/spf13/cobra" ) @@ -687,7 +688,12 @@ var _clusterExportCmd = &cobra.Command{ exit.Error(err) } - err = awsClient.DownloadFileFromS3(info.ClusterConfig.Bucket, apiResponse.Spec.RawAPIKey(info.ClusterConfig.ClusterName), path.Join(baseDir, apiResponse.Spec.FileName)) + yamlBytes, err := yaml.Marshal(apiResponse.Spec.API.SubmittedAPISpec) + if err != nil { + exit.Error(err) + } + + err = files.WriteFile(yamlBytes, path.Join(baseDir, apiResponse.Spec.FileName)) if err != nil { exit.Error(err) } diff --git a/cli/cmd/deploy.go b/cli/cmd/deploy.go index 28775835d7..7dfc79537f 100644 --- a/cli/cmd/deploy.go +++ b/cli/cmd/deploy.go @@ -195,6 +195,7 @@ func findProjectFiles(provider types.ProviderType, configPath string) ([]string, return nil, err } + // Include .env file containing environment variables dotEnvPath := path.Join(projectRoot, ".env") if files.IsFile(dotEnvPath) { projectPaths = append(projectPaths, dotEnvPath) diff --git a/cli/cmd/patch.go b/cli/cmd/patch.go new file mode 100644 index 0000000000..b66f103385 --- /dev/null +++ b/cli/cmd/patch.go @@ -0,0 +1,99 @@ +/* +Copyright 2020 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "fmt" + "strings" + + "github.com/cortexlabs/cortex/cli/cluster" + "github.com/cortexlabs/cortex/cli/local" + "github.com/cortexlabs/cortex/cli/types/flags" + "github.com/cortexlabs/cortex/pkg/lib/exit" + libjson "github.com/cortexlabs/cortex/pkg/lib/json" + "github.com/cortexlabs/cortex/pkg/lib/print" + "github.com/cortexlabs/cortex/pkg/lib/telemetry" + "github.com/cortexlabs/cortex/pkg/operator/schema" + "github.com/cortexlabs/cortex/pkg/types" + "github.com/spf13/cobra" +) + +var ( + _flagPatchEnv string + _flagPatchForce bool +) + +func patchInit() { + _patchCmd.Flags().SortFlags = false + _patchCmd.Flags().StringVarP(&_flagPatchEnv, "env", "e", getDefaultEnv(_generalCommandType), "environment to use") + _patchCmd.Flags().BoolVarP(&_flagPatchForce, "force", "f", false, "override the in-progress api update") + _patchCmd.Flags().VarP(&_flagOutput, "output", "o", fmt.Sprintf("output format: one of %s", strings.Join(flags.UserOutputTypeStrings(), "|"))) +} + +var _patchCmd = &cobra.Command{ + Use: "patch [CONFIG_FILE]", + Short: "update API configuration for a deployed API", + Args: cobra.RangeArgs(0, 1), + Run: func(cmd *cobra.Command, args []string) { + env, err := ReadOrConfigureEnv(_flagPatchEnv) + if err != nil { + telemetry.Event("cli.patch") + exit.Error(err) + } + telemetry.Event("cli.patch", map[string]interface{}{"provider": env.Provider.String(), "env_name": env.Name}) + + err = printEnvIfNotSpecified(_flagPatchEnv, cmd) + if err != nil { + exit.Error(err) + } + + configPath := getConfigPath(args) + + var deployResults []schema.DeployResult + if env.Provider == types.AWSProviderType { + deployResults, err = cluster.Patch(MustGetOperatorConfig(env.Name), configPath, _flagPatchForce) + if err != nil { + exit.Error(err) + } + } else { + deployResults, err = local.Patch(env, configPath) + if err != nil { + exit.Error(err) + } + } + + switch _flagOutput { + case flags.JSONOutputType: + bytes, err := libjson.Marshal(deployResults) + if err != nil { + exit.Error(err) + } + fmt.Println(string(bytes)) + case flags.PrettyOutputType: + message := deployMessage(deployResults, env.Name) + if didAnyResultsError(deployResults) { + print.StderrBoldFirstBlock(message) + } else { + print.BoldFirstBlock(message) + } + } + + if didAnyResultsError(deployResults) { + exit.Error(nil) + } + }, +} diff --git a/cli/cmd/root.go b/cli/cmd/root.go index 3c8bd91462..3f4e2bc948 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -127,6 +127,7 @@ func init() { envInit() getInit() logsInit() + patchInit() predictInit() refreshInit() versionInit() @@ -166,6 +167,7 @@ func Execute() { _rootCmd.AddCommand(_deployCmd) _rootCmd.AddCommand(_getCmd) + _rootCmd.AddCommand(_patchCmd) _rootCmd.AddCommand(_logsCmd) _rootCmd.AddCommand(_refreshCmd) _rootCmd.AddCommand(_predictCmd) diff --git a/cli/local/api.go b/cli/local/api.go index 1f85c4606d..b56a7a48ba 100644 --- a/cli/local/api.go +++ b/cli/local/api.go @@ -37,7 +37,7 @@ import ( var _deploymentID = "local" -func UpdateAPI(apiConfig *userconfig.API, models []spec.CuratedModelResource, configPath string, projectID string, deployDisallowPrompt bool, awsClient *aws.Client) (*schema.APIResponse, string, error) { +func UpdateAPI(apiConfig *userconfig.API, models []spec.CuratedModelResource, projectRoot string, projectID string, deployDisallowPrompt bool, awsClient *aws.Client) (*schema.APIResponse, string, error) { telemetry.Event("operator.deploy", apiConfig.TelemetryEvent(types.LocalProviderType)) var incompatibleVersion string @@ -83,7 +83,7 @@ func UpdateAPI(apiConfig *userconfig.API, models []spec.CuratedModelResource, co } } - newAPISpec.LocalProjectDir = files.Dir(configPath) + newAPISpec.LocalProjectDir = projectRoot if areAPIsEqual(newAPISpec, prevAPISpec) { return toAPIResponse(newAPISpec), fmt.Sprintf("%s is up to date", newAPISpec.Resource.UserString()), nil diff --git a/cli/local/deploy.go b/cli/local/deploy.go index 2d2410620a..b017d3aee6 100644 --- a/cli/local/deploy.go +++ b/cli/local/deploy.go @@ -30,6 +30,7 @@ import ( "github.com/cortexlabs/cortex/pkg/operator/schema" "github.com/cortexlabs/cortex/pkg/types" "github.com/cortexlabs/cortex/pkg/types/spec" + "github.com/cortexlabs/cortex/pkg/types/userconfig" ) func Deploy(env cliconfig.Environment, configPath string, projectFileList []string, deployDisallowPrompt bool) ([]schema.DeployResult, error) { @@ -45,11 +46,26 @@ func Deploy(env cliconfig.Environment, configPath string, projectFileList []stri return nil, err } - projectFiles, err := newProjectFiles(projectFileList, configPath) + if !files.IsAbsOrTildePrefixed(configPath) { + return nil, errors.ErrorUnexpected(fmt.Sprintf("%s is not an absolute path", configPath)) + } + projectRoot := files.Dir(configPath) + + projectFiles, err := newProjectFiles(projectFileList, projectRoot) + if err != nil { + return nil, err + } + + apiConfigs, err := spec.ExtractAPIConfigs(configBytes, types.LocalProviderType, configFileName, nil, nil) if err != nil { return nil, err } + return deploy(env, apiConfigs, projectFiles, deployDisallowPrompt) +} + +func deploy(env cliconfig.Environment, apiConfigs []userconfig.API, projectFiles ProjectFiles, deployDisallowPrompt bool) ([]schema.DeployResult, error) { + var err error var awsClient *aws.Client var gcpClient *gcp.Client @@ -72,11 +88,6 @@ func Deploy(env cliconfig.Environment, configPath string, projectFileList []stri } } - apiConfigs, err := spec.ExtractAPIConfigs(configBytes, types.LocalProviderType, configFileName, nil, nil) - if err != nil { - return nil, err - } - models := []spec.CuratedModelResource{} err = ValidateLocalAPIs(apiConfigs, &models, projectFiles, awsClient, gcpClient) if err != nil { @@ -84,15 +95,16 @@ func Deploy(env cliconfig.Environment, configPath string, projectFileList []stri return nil, err } - projectID, err := files.HashFile(projectFileList[0], projectFileList[1:]...) + projectRelFilePaths := projectFiles.AllAbsPaths() + projectID, err := files.HashFile(projectRelFilePaths[0], projectRelFilePaths[1:]...) if err != nil { - return nil, errors.Wrap(err, "failed to hash directory", filepath.Dir(configPath)) + return nil, errors.Wrap(err, "failed to hash directory", projectFiles.projectRoot) } results := make([]schema.DeployResult, len(apiConfigs)) for i := range apiConfigs { apiConfig := apiConfigs[i] - api, msg, err := UpdateAPI(&apiConfig, models, configPath, projectID, deployDisallowPrompt, awsClient) + api, msg, err := UpdateAPI(&apiConfig, models, projectFiles.projectRoot, projectID, deployDisallowPrompt, awsClient) results[i].Message = msg if err != nil { results[i].Error = errors.Message(err) diff --git a/cli/local/patch.go b/cli/local/patch.go new file mode 100644 index 0000000000..eec6f2132e --- /dev/null +++ b/cli/local/patch.go @@ -0,0 +1,104 @@ +/* +Copyright 2020 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package local + +import ( + "path" + "path/filepath" + + "github.com/cortexlabs/cortex/cli/types/cliconfig" + "github.com/cortexlabs/cortex/pkg/lib/files" + "github.com/cortexlabs/cortex/pkg/operator/schema" + "github.com/cortexlabs/cortex/pkg/types" + "github.com/cortexlabs/cortex/pkg/types/spec" + "github.com/cortexlabs/cortex/pkg/types/userconfig" +) + +func Patch(env cliconfig.Environment, configPath string) ([]schema.DeployResult, error) { + configFileName := filepath.Base(configPath) + + configBytes, err := files.ReadFileBytes(configPath) + if err != nil { + return nil, err + } + + apiConfigs, err := spec.ExtractAPIConfigs(configBytes, types.LocalProviderType, configFileName, nil, nil) + if err != nil { + return nil, err + } + + deployResults := []schema.DeployResult{} + for i := range apiConfigs { + apiConfig := &apiConfigs[i] + apiResponse, err := GetAPI(apiConfig.Name) + if err != nil { + return nil, err + } + + localProjectDir := apiResponse[0].Spec.LocalProjectDir + + projectFileList, err := findProjectFiles(localProjectDir) + if err != nil { + return nil, err + } + + projectFiles, err := newProjectFiles(projectFileList, localProjectDir) + if err != nil { + return nil, err + } + + deployResult, err := deploy(env, []userconfig.API{*apiConfig}, projectFiles, true) + if err != nil { + return nil, err + } + + deployResults = append(deployResults, deployResult...) + } + + return deployResults, nil +} + +func findProjectFiles(projectRoot string) ([]string, error) { + ignoreFns := []files.IgnoreFn{ + files.IgnoreCortexDebug, + files.IgnoreHiddenFiles, + files.IgnoreHiddenFolders, + files.IgnorePythonGeneratedFiles, + } + + cortexIgnorePath := path.Join(projectRoot, ".cortexignore") + if files.IsFile(cortexIgnorePath) { + cortexIgnore, err := files.GitIgnoreFn(cortexIgnorePath) + if err != nil { + return nil, err + } + ignoreFns = append(ignoreFns, cortexIgnore) + } + + projectPaths, err := files.ListDirRecursive(projectRoot, false, ignoreFns...) + if err != nil { + return nil, err + } + + // Include .env file containing environment variables + dotEnvPath := path.Join(projectRoot, ".env") + if files.IsFile(dotEnvPath) { + projectPaths = append(projectPaths, dotEnvPath) + } + + return projectPaths, nil +} diff --git a/cli/local/validations.go b/cli/local/validations.go index b4feeb0b47..6f63cfab7d 100644 --- a/cli/local/validations.go +++ b/cli/local/validations.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "net" + "path" "path/filepath" "strings" @@ -47,12 +48,7 @@ type ProjectFiles struct { projectRoot string } -func newProjectFiles(projectFileList []string, configPath string) (ProjectFiles, error) { - if !files.IsAbsOrTildePrefixed(configPath) { - return ProjectFiles{}, errors.ErrorUnexpected(fmt.Sprintf("%s is not an absolute path", configPath)) - } - projectRoot := files.Dir(configPath) - +func newProjectFiles(projectFileList []string, projectRoot string) (ProjectFiles, error) { relFilePaths := make([]string, len(projectFileList)) for i, projectFilePath := range projectFileList { if !files.IsAbsOrTildePrefixed(projectFilePath) { @@ -74,6 +70,15 @@ func (projectFiles ProjectFiles) AllPaths() []string { return projectFiles.relFilePaths } +func (projectFiles ProjectFiles) AllAbsPaths() []string { + absPaths := make([]string, 0, len(projectFiles.relFilePaths)) + for _, relPath := range projectFiles.relFilePaths { + absPaths = append(absPaths, path.Join(projectFiles.projectRoot, relPath)) + } + + return absPaths +} + func (projectFiles ProjectFiles) GetFile(path string) ([]byte, error) { for _, projectFilePath := range projectFiles.relFilePaths { if path == projectFilePath { diff --git a/dev/generate_cli_md.sh b/dev/generate_cli_md.sh index 7992f91131..abd98ae440 100755 --- a/dev/generate_cli_md.sh +++ b/dev/generate_cli_md.sh @@ -34,6 +34,7 @@ commands=( "deploy" "get" "logs" + "patch" "refresh" "predict" "delete" diff --git a/docs/miscellaneous/cli.md b/docs/miscellaneous/cli.md index c5f5216f82..580afd9fc9 100644 --- a/docs/miscellaneous/cli.md +++ b/docs/miscellaneous/cli.md @@ -75,6 +75,21 @@ Flags: -h, --help help for logs ``` +### patch + +```text +update API configuration for a deployed API + +Usage: + cortex patch [CONFIG_FILE] [flags] + +Flags: + -e, --env string environment to use (default "local") + -f, --force override the in-progress api update + -o, --output string output format: one of pretty|json (default "pretty") + -h, --help help for patch +``` + ### refresh ```text diff --git a/docs/miscellaneous/python-client.md b/docs/miscellaneous/python-client.md index cc16c6f1d0..f8c767672b 100644 --- a/docs/miscellaneous/python-client.md +++ b/docs/miscellaneous/python-client.md @@ -14,6 +14,7 @@ _WARNING: you are on the master branch, please refer to the docs on the branch t * [list\_apis](#list_apis) * [get\_job](#get_job) * [refresh](#refresh) + * [patch](#patch) * [delete\_api](#delete_api) * [stop\_job](#stop_job) * [stream\_api\_logs](#stream_api_logs) @@ -63,7 +64,7 @@ from S3 and authenticate to ECR, and will be set in your Predictor. ## cluster\_client ```python -cluster_client(name: str, provider: str, operator_endpoint: str, aws_access_key_id: str, aws_secret_access_key: str) -> Client +cluster_client(name: str, provider: str, operator_endpoint: str, aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None) -> Client ``` Create a new environment to connect to an existing Cortex Cluster, and initialize a client to deploy and manage APIs on that cluster. @@ -193,6 +194,19 @@ Restart all of the replicas for a Realtime API without downtime. - `api_name` - Name of the API to refresh. - `force` - Override an already in-progress API update. +## patch + +```python + | patch(api_spec: dict, force: bool = False) -> dict +``` + +Update the api specification for an API that has already been deployed. + +**Arguments**: + +- `api_spec` - The new api specification to apply +- `force` - Override an already in-progress API update. + ## delete\_api ```python diff --git a/pkg/lib/cast/interface.go b/pkg/lib/cast/interface.go index 0a0fa73f4a..b290d0e62c 100644 --- a/pkg/lib/cast/interface.go +++ b/pkg/lib/cast/interface.go @@ -694,6 +694,40 @@ func InterfaceToStrInterfaceMap(in interface{}) (map[string]interface{}, bool) { return out, true } +// Recursively casts interface->interface maps to string->interface maps +func JSONMarshallable(in interface{}) (interface{}, bool) { + if in == nil { + return nil, true + } + + if inMap, ok := InterfaceToInterfaceInterfaceMap(in); ok { + out := map[string]interface{}{} + for key, value := range inMap { + castedKey, ok := key.(string) + if !ok { + return nil, false + } + castedValue, ok := JSONMarshallable(value) + if !ok { + return nil, false + } + out[castedKey] = castedValue + } + return out, true + } else if inSlice, ok := InterfaceToInterfaceSlice(in); ok { + result := make([]interface{}, 0, len(inSlice)) + for _, inValue := range inSlice { + castedInValue, ok := JSONMarshallable(inValue) + if !ok { + return nil, false + } + result = append(result, castedInValue) + } + return result, true + } + return in, true +} + func InterfaceToStrStrMap(in interface{}) (map[string]string, bool) { if in == nil { return nil, true diff --git a/pkg/lib/cast/interface_test.go b/pkg/lib/cast/interface_test.go index f291e8c3f3..340eb4e5d3 100644 --- a/pkg/lib/cast/interface_test.go +++ b/pkg/lib/cast/interface_test.go @@ -17,6 +17,7 @@ limitations under the License. package cast import ( + "encoding/json" "testing" "github.com/stretchr/testify/require" @@ -186,6 +187,58 @@ func TestInterfaceToInterfaceInterfaceMap(t *testing.T) { require.Equal(t, expected, casted) } +func TestJSONMarshallable(t *testing.T) { + var ok bool + var in interface{} + var casted interface{} + var expected interface{} + var err error + + in = map[string]interface{}{"test": map[interface{}]interface{}{"testing": []string{}}} + expected = map[string]interface{}{"test": map[string]interface{}{"testing": []interface{}{}}} + casted, ok = JSONMarshallable(in) + require.True(t, ok) + require.Equal(t, expected, casted) + _, err = json.Marshal(casted) + require.Equal(t, err, nil) + + in = map[string]interface{}{"test": map[interface{}]interface{}{1: []string{}}, "slice": []int{1}} + casted, ok = JSONMarshallable(in) + require.False(t, ok) + + in = map[string]interface{}{"test": map[interface{}]interface{}{"1": []string{}}, "slice": []int{1}} + expected = map[string]interface{}{"test": map[string]interface{}{"1": []interface{}{}}, "slice": []interface{}{1}} + casted, ok = JSONMarshallable(in) + require.True(t, ok) + require.Equal(t, expected, casted) + _, err = json.Marshal(casted) + require.Equal(t, err, nil) + + in = map[string]interface{}{"test": nil} + expected = map[string]interface{}{"test": nil} + casted, ok = JSONMarshallable(in) + require.True(t, ok) + require.Equal(t, expected, casted) + _, err = json.Marshal(casted) + require.Equal(t, err, nil) + + in = map[string]interface{}{"slice": []interface{}{1, "1", map[interface{}]interface{}{"key": false}}} + expected = map[string]interface{}{"slice": []interface{}{1, "1", map[string]interface{}{"key": false}}} + casted, ok = JSONMarshallable(in) + require.True(t, ok) + require.Equal(t, expected, casted) + _, err = json.Marshal(casted) + require.Equal(t, err, nil) + + in = map[string]interface{}{} + expected = map[string]interface{}{} + casted, ok = JSONMarshallable(in) + require.True(t, ok) + require.Equal(t, expected, casted) + _, err = json.Marshal(casted) + require.Equal(t, err, nil) +} + func TestFlattenInterfaceSlices(t *testing.T) { expected := []interface{}{"a", "b", "c"} diff --git a/pkg/operator/endpoints/patch.go b/pkg/operator/endpoints/patch.go new file mode 100644 index 0000000000..7706ab090d --- /dev/null +++ b/pkg/operator/endpoints/patch.go @@ -0,0 +1,51 @@ +/* +Copyright 2020 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoints + +import ( + "io/ioutil" + "net/http" + + "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/operator/resources" +) + +func Patch(w http.ResponseWriter, r *http.Request) { + force := getOptionalBoolQParam("force", false, r) + + configFileName, err := getRequiredQueryParam("configFileName", r) + if err != nil { + respondError(w, r, errors.WithStack(err)) + return + } + + rw := http.MaxBytesReader(w, r.Body, 10<<20) + + bodyBytes, err := ioutil.ReadAll(rw) + if err != nil { + respondError(w, r, err) + return + } + + response, err := resources.Patch(bodyBytes, configFileName, force) + if err != nil { + respondError(w, r, err) + return + } + + respond(w, response) +} diff --git a/pkg/operator/main.go b/pkg/operator/main.go index 3e51dac9c3..8eaeb30dbe 100644 --- a/pkg/operator/main.go +++ b/pkg/operator/main.go @@ -96,6 +96,7 @@ func main() { routerWithAuth.HandleFunc("/info", endpoints.Info).Methods("GET") routerWithAuth.HandleFunc("/deploy", endpoints.Deploy).Methods("POST") + routerWithAuth.HandleFunc("/patch", endpoints.Patch).Methods("POST") routerWithAuth.HandleFunc("/refresh/{apiName}", endpoints.Refresh).Methods("POST") routerWithAuth.HandleFunc("/delete/{apiName}", endpoints.Delete).Methods("DELETE") routerWithAuth.HandleFunc("/get", endpoints.GetAPIs).Methods("GET") diff --git a/pkg/operator/resources/batchapi/api.go b/pkg/operator/resources/batchapi/api.go index 0c695fb18a..ae6067a356 100644 --- a/pkg/operator/resources/batchapi/api.go +++ b/pkg/operator/resources/batchapi/api.go @@ -49,10 +49,6 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string, return nil, "", errors.Wrap(err, "upload api spec") } - if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil { - return nil, "", errors.Wrap(err, "upload raw api spec") - } - err = applyK8sResources(api, prevVirtualService) if err != nil { go deleteK8sResources(api.Name) @@ -79,10 +75,6 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string) (*spec.API, string, return nil, "", errors.Wrap(err, "upload api spec") } - if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil { - return nil, "", errors.Wrap(err, "upload raw api spec") - } - err = applyK8sResources(api, prevVirtualService) if err != nil { return nil, "", err diff --git a/pkg/operator/resources/realtimeapi/api.go b/pkg/operator/resources/realtimeapi/api.go index 4977d7a04c..0392254fa0 100644 --- a/pkg/operator/resources/realtimeapi/api.go +++ b/pkg/operator/resources/realtimeapi/api.go @@ -61,10 +61,6 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A return nil, "", errors.Wrap(err, "upload api spec") } - if err := config.UploadBytesToBucket(api.RawYAMLBytes, api.RawAPIKey(config.ClusterName())); err != nil { - return nil, "", errors.Wrap(err, "upload raw api spec") - } - // Use api spec indexed by PredictorID for replicas to prevent rolling updates when SpecID changes without PredictorID changing if err := config.UploadJSONToBucket(api, api.PredictorKey); err != nil { return nil, "", errors.Wrap(err, "upload predictor spec") @@ -103,10 +99,6 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*spec.A return nil, "", errors.Wrap(err, "upload api spec") } - if err := config.UploadBytesToBucket(api.RawYAMLBytes, api.RawAPIKey(config.ClusterName())); err != nil { - return nil, "", errors.Wrap(err, "upload raw api spec") - } - // Use api spec indexed by PredictorID for replicas to prevent rolling updates when SpecID changes without PredictorID changing if err := config.UploadJSONToBucket(api, api.PredictorKey); err != nil { return nil, "", errors.Wrap(err, "upload predictor spec") diff --git a/pkg/operator/resources/resources.go b/pkg/operator/resources/resources.go index 8a06e13f66..fb98a2244e 100644 --- a/pkg/operator/resources/resources.go +++ b/pkg/operator/resources/resources.go @@ -84,7 +84,6 @@ func Deploy(projectBytes []byte, configFileName string, configBytes []byte, forc projectFiles := ProjectFiles{ ProjectByteMap: projectFileMap, - ConfigFileName: configFileName, } var apiConfigs []userconfig.API @@ -177,6 +176,96 @@ func UpdateAPI(apiConfig *userconfig.API, projectID string, force bool) (*schema return nil, msg, err } +func Patch(configBytes []byte, configFileName string, force bool) ([]schema.DeployResult, error) { + var apiConfigs []userconfig.API + var err error + + if config.Provider == types.AWSProviderType { + apiConfigs, err = spec.ExtractAPIConfigs(configBytes, config.Provider, configFileName, &config.Cluster.Config, nil) + if err != nil { + return nil, err + } + } else { + apiConfigs, err = spec.ExtractAPIConfigs(configBytes, config.Provider, configFileName, nil, &config.GCPCluster.GCPConfig) + if err != nil { + return nil, err + } + } + + results := make([]schema.DeployResult, 0, len(apiConfigs)) + for i := range apiConfigs { + apiConfig := &apiConfigs[i] + result := schema.DeployResult{} + + apiSpec, msg, err := patchAPI(apiConfig, configFileName, force) + if err == nil && apiSpec != nil { + apiEndpoint, _ := operator.APIEndpoint(apiSpec) + + result.API = &schema.APIResponse{ + Spec: *apiSpec, + Endpoint: apiEndpoint, + } + } + + result.Message = msg + if err != nil { + result.Error = errors.ErrorStr(err) + } + + results = append(results, result) + } + return results, nil +} + +func patchAPI(apiConfig *userconfig.API, configFileName string, force bool) (*spec.API, string, error) { + deployedResource, err := GetDeployedResourceByName(apiConfig.Name) + if err != nil { + return nil, "", err + } + + if deployedResource.Kind == userconfig.UnknownKind { + return nil, "", ErrorOperationIsOnlySupportedForKind(*deployedResource, userconfig.RealtimeAPIKind, userconfig.BatchAPIKind, userconfig.TrafficSplitterKind) // unexpected + } + + var projectFiles ProjectFiles + + prevAPISpec, err := operator.DownloadAPISpec(deployedResource.Name, deployedResource.ID()) + if err != nil { + return nil, "", err + } + + if deployedResource.Kind != userconfig.TrafficSplitterKind { + bytes, err := config.AWS.ReadBytesFromS3(config.Cluster.Bucket, prevAPISpec.ProjectKey) + if err != nil { + return nil, "", err + } + + projectFileMap, err := archive.UnzipMemToMem(bytes) + if err != nil { + return nil, "", err + } + + projectFiles = ProjectFiles{ + ProjectByteMap: projectFileMap, + } + } + + err = ValidateClusterAPIs([]userconfig.API{*apiConfig}, projectFiles) + if err != nil { + err = errors.Append(err, fmt.Sprintf("\n\napi configuration schema can be found here:\n → Realtime API: https://docs.cortex.dev/v/%s/deployments/realtime-api/api-configuration\n → Batch API: https://docs.cortex.dev/v/%s/deployments/batch-api/api-configuration\n → Traffic Splitter: https://docs.cortex.dev/v/%s/deployments/realtime-api/traffic-splitter", consts.CortexVersionMinor, consts.CortexVersionMinor, consts.CortexVersionMinor)) + return nil, "", err + } + + switch deployedResource.Kind { + case userconfig.RealtimeAPIKind: + return realtimeapi.UpdateAPI(apiConfig, prevAPISpec.ProjectID, force) + case userconfig.BatchAPIKind: + return batchapi.UpdateAPI(apiConfig, prevAPISpec.ProjectID) + default: + return trafficsplitter.UpdateAPI(apiConfig, force) + } +} + func RefreshAPI(apiName string, force bool) (string, error) { deployedResource, err := GetDeployedResourceByName(apiName) if err != nil { diff --git a/pkg/operator/resources/trafficsplitter/api.go b/pkg/operator/resources/trafficsplitter/api.go index 276075a971..3a1aeea2aa 100644 --- a/pkg/operator/resources/trafficsplitter/api.go +++ b/pkg/operator/resources/trafficsplitter/api.go @@ -43,10 +43,6 @@ func UpdateAPI(apiConfig *userconfig.API, force bool) (*spec.API, string, error) return nil, "", errors.Wrap(err, "upload api spec") } - if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil { - return nil, "", errors.Wrap(err, "upload raw api spec") - } - if err := applyK8sVirtualService(api, prevVirtualService); err != nil { go deleteK8sResources(api.Name) return nil, "", err @@ -65,10 +61,6 @@ func UpdateAPI(apiConfig *userconfig.API, force bool) (*spec.API, string, error) return nil, "", errors.Wrap(err, "upload api spec") } - if err := config.AWS.UploadBytesToS3(api.RawYAMLBytes, config.Cluster.Bucket, api.RawAPIKey(config.Cluster.ClusterName)); err != nil { - return nil, "", errors.Wrap(err, "upload raw api spec") - } - if err := applyK8sVirtualService(api, prevVirtualService); err != nil { return nil, "", err } diff --git a/pkg/operator/resources/validations.go b/pkg/operator/resources/validations.go index 99a5e57cec..1b4babd7be 100644 --- a/pkg/operator/resources/validations.go +++ b/pkg/operator/resources/validations.go @@ -38,7 +38,6 @@ import ( type ProjectFiles struct { ProjectByteMap map[string][]byte - ConfigFileName string } func (projectFiles ProjectFiles) AllPaths() []string { diff --git a/pkg/types/spec/api.go b/pkg/types/spec/api.go index 46b0c803ca..0622c5fc08 100644 --- a/pkg/types/spec/api.go +++ b/pkg/types/spec/api.go @@ -164,17 +164,6 @@ func KeysPrefix(apiName string, clusterName string) string { ) + "/" } -func (api API) RawAPIKey(clusterName string) string { - return filepath.Join( - clusterName, - "apis", - api.Name, - "raw_api", - api.ID, - consts.CortexVersion+"-cortex.yaml", - ) -} - func MetadataRoot(apiName string, clusterName string) string { return filepath.Join( clusterName, diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 08b7c0d2aa..ffda2368d1 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -43,7 +43,6 @@ import ( "github.com/cortexlabs/cortex/pkg/types" "github.com/cortexlabs/cortex/pkg/types/clusterconfig" "github.com/cortexlabs/cortex/pkg/types/userconfig" - "github.com/cortexlabs/yaml" dockertypes "github.com/docker/docker/api/types" kresource "k8s.io/apimachinery/pkg/api/resource" ) @@ -676,11 +675,12 @@ func ExtractAPIConfigs( api.Index = i api.FileName = configFileName - rawYAMLBytes, err := yaml.Marshal([]map[string]interface{}{data}) - if err != nil { - return nil, errors.Wrap(err, api.Identify()) + interfaceMap, ok := cast.JSONMarshallable(data) + if !ok { + return nil, errors.ErrorUnexpected("unable to cast api spec to json") // unexpected } - api.RawYAMLBytes = rawYAMLBytes + + api.SubmittedAPISpec = interfaceMap if resourceStruct.Kind == userconfig.RealtimeAPIKind || resourceStruct.Kind == userconfig.BatchAPIKind { api.ApplyDefaultDockerPaths() diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index 90cd1f8c8c..184ef708c8 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -32,16 +32,16 @@ import ( type API struct { Resource - APIs []*TrafficSplit `json:"apis" yaml:"apis"` - Predictor *Predictor `json:"predictor" yaml:"predictor"` - Monitoring *Monitoring `json:"monitoring" yaml:"monitoring"` - Networking *Networking `json:"networking" yaml:"networking"` - Compute *Compute `json:"compute" yaml:"compute"` - Autoscaling *Autoscaling `json:"autoscaling" yaml:"autoscaling"` - UpdateStrategy *UpdateStrategy `json:"update_strategy" yaml:"update_strategy"` - Index int `json:"index" yaml:"-"` - FileName string `json:"file_name" yaml:"-"` - RawYAMLBytes []byte `json:"-" yaml:"-"` + APIs []*TrafficSplit `json:"apis" yaml:"apis"` + Predictor *Predictor `json:"predictor" yaml:"predictor"` + Monitoring *Monitoring `json:"monitoring" yaml:"monitoring"` + Networking *Networking `json:"networking" yaml:"networking"` + Compute *Compute `json:"compute" yaml:"compute"` + Autoscaling *Autoscaling `json:"autoscaling" yaml:"autoscaling"` + UpdateStrategy *UpdateStrategy `json:"update_strategy" yaml:"update_strategy"` + Index int `json:"index" yaml:"-"` + FileName string `json:"file_name" yaml:"-"` + SubmittedAPISpec interface{} `json:"submitted_api_spec" yaml:"submitted_api_spec"` } type Predictor struct { diff --git a/pkg/workloads/cortex/client/cortex/__init__.py b/pkg/workloads/cortex/client/cortex/__init__.py index e1982d4023..a6da59f5f0 100644 --- a/pkg/workloads/cortex/client/cortex/__init__.py +++ b/pkg/workloads/cortex/client/cortex/__init__.py @@ -44,7 +44,7 @@ def client(env: str): f"can't find environment {env}, create one by calling `cortex.cluster_client()`" ) - return Client(env) + return Client(environment) def local_client( diff --git a/pkg/workloads/cortex/client/cortex/client.py b/pkg/workloads/cortex/client/cortex/client.py index 31ee55e3ec..94c41a0cb0 100644 --- a/pkg/workloads/cortex/client/cortex/client.py +++ b/pkg/workloads/cortex/client/cortex/client.py @@ -34,14 +34,15 @@ class Client: - def __init__(self, env: str): + def __init__(self, env: dict): """ A client to deploy and manage APIs in the specified environment. Args: - env: Name of the environment to use. + env: Environment config """ self.env = env + self.env_name = env["name"] # CORTEX_VERSION_MINOR x5 def deploy( @@ -75,6 +76,11 @@ def deploy( Deployment status, API specification, and endpoint for each API. """ + if self.env["provider"] == "gcp" and wait: + raise ValueError( + "`wait` flag is not supported for clusters on GCP, please set the `wait` flag to false" + ) + if project_dir is not None and predictor is not None: raise ValueError( "`predictor` and `project_dir` parameters cannot be specified at the same time, please choose one" @@ -170,7 +176,7 @@ def _deploy( "deploy", config_file, "--env", - self.env, + self.env_name, "-o", "mixed", "-y", @@ -188,6 +194,10 @@ def _deploy( if not wait: return deploy_result + # logging immediately will show previous versions of the replica terminating; + # wait a few seconds for the new replicas to start initializing + time.sleep(5) + def stream_to_stdout(process): for c in iter(lambda: process.stdout.read(1), ""): sys.stdout.write(c) @@ -200,7 +210,7 @@ def stream_to_stdout(process): env = os.environ.copy() env["CORTEX_CLI_INVOKER"] = "python" process = subprocess.Popen( - [get_cli_path(), "logs", "--env", self.env, api_name], + [get_cli_path(), "logs", "--env", self.env_name, api_name], stderr=subprocess.STDOUT, stdout=subprocess.PIPE, encoding="utf8", @@ -214,7 +224,7 @@ def stream_to_stdout(process): while process.poll() is None: api = self.get_api(api_name) if api["status"]["status_code"] != "status_updating": - time.sleep(10) # wait for logs to stream + time.sleep(5) # accommodate latency in log streaming from the cluster process.terminate() break time.sleep(5) @@ -232,7 +242,7 @@ def get_api(self, api_name: str) -> dict: Returns: Information about the API, including the API specification, endpoint, status, and metrics (if applicable). """ - output = run_cli(["get", api_name, "--env", self.env, "-o", "json"], hide_output=True) + output = run_cli(["get", api_name, "--env", self.env_name, "-o", "json"], hide_output=True) apis = json.loads(output.strip()) return apis[0] @@ -244,7 +254,7 @@ def list_apis(self) -> list: Returns: List of APIs, including information such as the API specification, endpoint, status, and metrics (if applicable). """ - args = ["get", "-o", "json", "--env", self.env] + args = ["get", "-o", "json", "--env", self.env_name] output = run_cli(args, hide_output=True) @@ -261,7 +271,7 @@ def get_job(self, api_name: str, job_id: str) -> dict: Returns: Information about the job, including the job status, worker status, and job progress. """ - args = ["get", api_name, job_id, "--env", self.env, "-o", "json"] + args = ["get", api_name, job_id, "--env", self.env_name, "-o", "json"] output = run_cli(args, hide_output=True) @@ -275,13 +285,35 @@ def refresh(self, api_name: str, force: bool = False): api_name: Name of the API to refresh. force: Override an already in-progress API update. """ - args = ["refresh", api_name, "--env", self.env, "-o", "json"] + args = ["refresh", api_name, "--env", self.env_name, "-o", "json"] if force: args.append("--force") run_cli(args, hide_output=True) + def patch(self, api_spec: dict, force: bool = False) -> dict: + """ + Update the api specification for an API that has already been deployed. + + Args: + api_spec: The new api specification to apply + force: Override an already in-progress API update. + """ + + cortex_yaml_file = ( + Path.home() / ".cortex" / "deployments" / f"cortex-{str(uuid.uuid4())}.yaml" + ) + with util.open_temporarily(cortex_yaml_file, "w") as f: + yaml.dump([api_spec], f) + args = ["patch", cortex_yaml_file, "--env", self.env_name, "-o", "json"] + + if force: + args.append("--force") + + output = run_cli(args, hide_output=True) + return json.loads(output.strip()) + def delete_api(self, api_name: str, keep_cache: bool = False): """ Delete an API. @@ -294,7 +326,7 @@ def delete_api(self, api_name: str, keep_cache: bool = False): "delete", api_name, "--env", - self.env, + self.env_name, "--force", "-o", "json", @@ -318,7 +350,7 @@ def stop_job(self, api_name: str, job_id: str, keep_cache: bool = False): api_name, job_id, "--env", - self.env, + self.env_name, "-o", "json", ] @@ -335,7 +367,7 @@ def stream_api_logs( Args: api_name: Name of the API. """ - args = ["logs", api_name, "--env", self.env] + args = ["logs", api_name, "--env", self.env_name] run_cli(args) def stream_job_logs( @@ -350,5 +382,5 @@ def stream_job_logs( api_name: Name of the Batch API. job_id: Job ID. """ - args = ["logs", api_name, job_id, "--env", self.env] + args = ["logs", api_name, job_id, "--env", self.env_name] run_cli(args) diff --git a/pkg/workloads/cortex/client/cortex/util.py b/pkg/workloads/cortex/client/cortex/util.py index e7ea880a5d..08f5e97f94 100644 --- a/pkg/workloads/cortex/client/cortex/util.py +++ b/pkg/workloads/cortex/client/cortex/util.py @@ -20,6 +20,7 @@ @contextmanager def open_temporarily(path, mode): + Path(path).parent.mkdir(parents=True, exist_ok=True) file = open(path, mode) try: