diff --git a/cli/cmd/lib_realtime_apis.go b/cli/cmd/lib_realtime_apis.go index dd4021dbd4..1d335716d3 100644 --- a/cli/cmd/lib_realtime_apis.go +++ b/cli/cmd/lib_realtime_apis.go @@ -68,7 +68,7 @@ func realtimeAPITable(realtimeAPI schema.APIResponse, env cliconfig.Environment) out += "\n" + console.Bold("endpoint: ") + realtimeAPI.Endpoint + "\n" - if !(realtimeAPI.Spec.Predictor.Type == userconfig.PythonPredictorType && realtimeAPI.Spec.Predictor.ModelPath == nil && realtimeAPI.Spec.Predictor.Models == nil) { + if !(realtimeAPI.Spec.Predictor.Type == userconfig.PythonPredictorType && realtimeAPI.Spec.Predictor.MultiModelReloading == nil) { out += "\n" + describeModelInput(realtimeAPI.Status, realtimeAPI.Spec.Predictor, realtimeAPI.Endpoint) } diff --git a/cli/local/docker_spec.go b/cli/local/docker_spec.go index b261a76972..ed4cdabbcb 100644 --- a/cli/local/docker_spec.go +++ b/cli/local/docker_spec.go @@ -96,7 +96,7 @@ func getAPIEnv(api *spec.API, awsClient *aws.Client, gcpClient *gcp.Client) []st "CORTEX_MAX_REPLICA_CONCURRENCY="+s.Int32(api.Predictor.ProcessesPerReplica*api.Predictor.ThreadsPerProcess+1024), // allow a queue of 1024 ) - if api.Predictor.ModelPath != nil || api.Predictor.Models != nil { + if api.Predictor.Type != userconfig.PythonPredictorType || api.Predictor.MultiModelReloading != nil { envs = append(envs, "CORTEX_MODEL_DIR="+_modelDir) } diff --git a/cli/local/model_cache.go b/cli/local/model_cache.go index 175afb028f..2f582ef8e8 100644 --- a/cli/local/model_cache.go +++ b/cli/local/model_cache.go @@ -36,6 +36,16 @@ func CacheLocalModels(apiSpec *spec.API, models []spec.CuratedModelResource) err var localModelCache *spec.LocalModelCache localModelCaches := make([]*spec.LocalModelCache, 0) + var predictorModels *userconfig.MultiModels + var predictorModelsKey string + if apiSpec.Predictor.Models != nil { + predictorModels = apiSpec.Predictor.Models + predictorModelsKey = userconfig.ModelsKey + } else if apiSpec.Predictor.MultiModelReloading != nil { + predictorModels = apiSpec.Predictor.MultiModelReloading + predictorModelsKey = userconfig.MultiModelReloadingKey + } + modelsThatWereCachedAlready := 0 for _, model := range models { if !model.LocalPath { @@ -44,12 +54,12 @@ func CacheLocalModels(apiSpec *spec.API, models []spec.CuratedModelResource) err localModelCache, wasAlreadyCached, err = cacheLocalModel(model) if err != nil { - if apiSpec.Predictor.ModelPath != nil { - return errors.Wrap(err, apiSpec.Identify(), userconfig.PredictorKey, userconfig.ModelPathKey) - } else if apiSpec.Predictor.Models != nil && apiSpec.Predictor.Models.Dir != nil { - return errors.Wrap(err, apiSpec.Identify(), userconfig.PredictorKey, userconfig.ModelsKey, userconfig.ModelsDirKey, model.Name, *apiSpec.Predictor.Models.Dir) + if predictorModels.Path != nil { + return errors.Wrap(err, apiSpec.Identify(), userconfig.PredictorKey, predictorModelsKey, userconfig.ModelsPathKey) + } else if predictorModels.Dir != nil { + return errors.Wrap(err, apiSpec.Identify(), userconfig.PredictorKey, predictorModelsKey, userconfig.ModelsDirKey, model.Name, *apiSpec.Predictor.Models.Dir) } - return errors.Wrap(err, apiSpec.Identify(), userconfig.PredictorKey, userconfig.ModelsKey, userconfig.ModelsPathsKey, model.Name, userconfig.ModelPathKey) + return errors.Wrap(err, apiSpec.Identify(), userconfig.PredictorKey, predictorModelsKey, userconfig.ModelsPathsKey, model.Name, userconfig.ModelsPathKey) } if wasAlreadyCached { modelsThatWereCachedAlready++ @@ -79,7 +89,7 @@ func cacheLocalModel(model spec.CuratedModelResource) (*spec.LocalModelCache, bo return nil, false, nil } - hash, err := localModelHash(model.ModelPath) + hash, err := localModelHash(model.Path) if err != nil { return nil, false, err } @@ -130,7 +140,7 @@ func cacheLocalModel(model spec.CuratedModelResource) (*spec.LocalModelCache, bo if len(model.Versions) == 0 { destModelDir = filepath.Join(destModelDir, "1") } - if err := files.CopyDirOverwrite(strings.TrimSuffix(model.ModelPath, "/"), s.EnsureSuffix(destModelDir, "/")); err != nil { + if err := files.CopyDirOverwrite(strings.TrimSuffix(model.Path, "/"), s.EnsureSuffix(destModelDir, "/")); err != nil { return nil, false, err } diff --git a/docs/workloads/batch/configuration.md b/docs/workloads/batch/configuration.md index 7ef40fcb72..bd7f138a68 100644 --- a/docs/workloads/batch/configuration.md +++ b/docs/workloads/batch/configuration.md @@ -32,13 +32,14 @@ predictor: type: tensorflow path: # path to a python file with a TensorFlowPredictor class definition, relative to the Cortex root (required) - model_path: # S3 path to an exported model (e.g. s3://my-bucket/exported_model) (either this or 'models' must be provided) - signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) - models: # use this when multiple models per API are desired (either this or 'model_path' must be provided) - - name: # unique name for the model (e.g. text-generator) (required) - model_path: # S3 path to an exported model (e.g. s3://my-bucket/exported_model) (required) - signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) - ... + models: # use this to serve a single model or multiple ones + path: # S3 path to an exported model (e.g. s3://my-bucket/exported_model) (either this or 'paths' field must be provided) + paths: # (either this or 'path' must be provided) + - name: # unique name for the model (e.g. text-generator) (required) + path: # S3 path to an exported model (e.g. s3://my-bucket/exported_model) (required) + signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) + ... + signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) server_side_batching: # (optional) max_batch_size: # the maximum number of requests to aggregate before running inference batch_interval: # the maximum amount of time to spend waiting for additional requests before running inference on the batch of requests @@ -66,11 +67,12 @@ predictor: type: onnx path: # path to a python file with an ONNXPredictor class definition, relative to the Cortex root (required) - model_path: # S3 path to an exported model (e.g. s3://my-bucket/exported_model.onnx) (either this or 'models' must be provided) - models: # use this when multiple models per API are desired (either this or 'model_path' must be provided) - - name: # unique name for the model (e.g. text-generator) (required) - model_path: # S3 path to an exported model (e.g. s3://my-bucket/exported_model.onnx) (required) - ... + models: # use this to serve a single model or multiple ones + path: # S3 path to an exported model (e.g. s3://my-bucket/exported_model) (either this or 'paths' must be provided) + paths: # (either this or 'path' must be provided) + - name: # unique name for the model (e.g. text-generator) (required) + path: # S3 path to an exported model (e.g. s3://my-bucket/exported_model.onnx) (required) + ... config: # arbitrary dictionary passed to the constructor of the Predictor (can be overridden by config passed in job submission) (optional) python_path: # path to the root of your Python folder that will be appended to PYTHONPATH (default: folder containing cortex.yaml) image: # docker image to use for the Predictor (default: quay.io/cortexlabs/onnx-predictor-gpu:master or quay.io/cortexlabs/onnx-predictor-cpu:master based on compute) diff --git a/docs/workloads/multi-model/configuration.md b/docs/workloads/multi-model/configuration.md index cf637ef254..070dd9f2b9 100644 --- a/docs/workloads/multi-model/configuration.md +++ b/docs/workloads/multi-model/configuration.md @@ -14,7 +14,8 @@ The directory `s3://cortex-examples/sklearn/mpg-estimator/linreg/` contains 4 di predictor: type: python path: predictor.py - model_path: s3://cortex-examples/sklearn/mpg-estimator/linreg/ + models: + path: s3://cortex-examples/sklearn/mpg-estimator/linreg/ ``` #### `predictor.py` @@ -94,11 +95,11 @@ class PythonPredictor: models: paths: - name: inception - model_path: s3://cortex-examples/tensorflow/image-classifier/inception/ + path: s3://cortex-examples/tensorflow/image-classifier/inception/ - name: iris - model_path: s3://cortex-examples/tensorflow/iris-classifier/nn/ + path: s3://cortex-examples/tensorflow/iris-classifier/nn/ - name: resnet50 - model_path: s3://cortex-examples/tensorflow/resnet50/ + path: s3://cortex-examples/tensorflow/resnet50/ ... ``` @@ -130,11 +131,11 @@ class TensorFlowPredictor: models: paths: - name: resnet50 - model_path: s3://cortex-examples/onnx/resnet50/ + path: s3://cortex-examples/onnx/resnet50/ - name: mobilenet - model_path: s3://cortex-examples/onnx/mobilenet/ + path: s3://cortex-examples/onnx/mobilenet/ - name: shufflenet - model_path: s3://cortex-examples/onnx/shufflenet/ + path: s3://cortex-examples/onnx/shufflenet/ ... ``` diff --git a/docs/workloads/realtime/configuration.md b/docs/workloads/realtime/configuration.md index c3f3165617..0d7c713c33 100644 --- a/docs/workloads/realtime/configuration.md +++ b/docs/workloads/realtime/configuration.md @@ -9,13 +9,13 @@ predictor: type: python path: # path to a python file with a PythonPredictor class definition, relative to the Cortex root (required) - model_path: # S3 path to an exported model directory (e.g. s3://my-bucket/exported_model/) (optional, cannot be provided along with 'models') - models: # use this to serve multiple models in a single API (optional, cannot be provided along with 'model_path') - dir: # S3 path to a directory containing multiple models (e.g. s3://my-bucket/models/) (either this or 'paths' must be provided) - paths: # list of S3 paths to exported model directories (either this or 'dir' must be provided) + multi_model_reloading: # use this to serve a single model or multiple ones with live reloading (optional) + path: # S3 path to an exported model directory (e.g. s3://my-bucket/exported_model/) (either this, 'dir', or 'paths' must be provided) + paths: # list of S3 paths to exported model directories (either this, 'dir', or 'path' must be provided) - name: # unique name for the model (e.g. text-generator) (required) - model_path: # S3 path to an exported model directory (e.g. s3://my-bucket/exported_model/) (required) + path: # S3 path to an exported model directory (e.g. s3://my-bucket/exported_model/) (required) ... + dir: # S3 path to a directory containing multiple models (e.g. s3://my-bucket/models/) (either this, 'path', or 'paths' must be provided) cache_size: # the number models to keep in memory (optional; all models are kept in memory by default) disk_cache_size: # the number of models to keep on disk (optional; all models are kept on disk by default) server_side_batching: # (optional) @@ -62,16 +62,15 @@ predictor: type: tensorflow path: # path to a python file with a TensorFlowPredictor class definition, relative to the Cortex root (required) - model_path: # S3 path to an exported model directory (e.g. s3://my-bucket/exported_model/) (either this or 'models' must be provided) - signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) - models: # use this to serve multiple models in a single API (either this or 'model_path' must be provided) - dir: # S3 path to a directory containing multiple models (e.g. s3://my-bucket/models/) (either this or 'paths' must be provided) - paths: # list of S3 paths to exported model directories (either this or 'dir' must be provided) + models: # use this to serve a single model or multiple ones (required) + path: # S3 path to an exported model directory (e.g. s3://my-bucket/exported_model/) (either this, 'dir', or 'paths' must be provided) + paths: # list of S3 paths to exported model directories (either this, 'dir', or 'path' must be provided) - name: # unique name for the model (e.g. text-generator) (required) - model_path: # S3 path to an exported model directory (e.g. s3://my-bucket/exported_model/) (required) + path: # S3 path to an exported model directory (e.g. s3://my-bucket/exported_model/) (required) signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) ... - signature_key: # name of the signature def to use for prediction for 'dir'-specified models or for models specified using 'paths' that haven't had a signature key set + dir: # S3 path to a directory containing multiple models (e.g. s3://my-bucket/models/) (either this, 'path', or 'paths' must be provided) + signature_key: # name of the signature def to use for prediction (required if your model has more than one signature def) cache_size: # the number models to keep in memory (optional; all models are kept in memory by default) disk_cache_size: # the number of models to keep on disk (optional; all models are kept on disk by default) server_side_batching: # (optional) @@ -119,13 +118,13 @@ predictor: type: onnx path: # path to a python file with an ONNXPredictor class definition, relative to the Cortex root (required) - model_path: # S3 path to an exported model directory (e.g. s3://my-bucket/exported_model/) (either this or 'models' must be provided) - models: # use this to serve multiple models in a single API (either this or 'model_path' must be provided) - dir: # S3 path to a directory containing multiple models (e.g. s3://my-bucket/models/) (either this or 'paths' must be provided) - paths: # list of S3 paths to exported model directories (either this or 'dir' must be provided) + models: # use this to serve a single model or multiple ones (required) + path: # S3 path to an exported model directory (e.g. s3://my-bucket/exported_model/) (either this, 'dir', or 'paths' must be provided) + paths: # list of S3 paths to exported model directories (either this, 'dir', or 'path' must be provided) - name: # unique name for the model (e.g. text-generator) (required) - model_path: # S3 path to an exported model directory (e.g. s3://my-bucket/exported_model/) (required) + path: # S3 path to an exported model directory (e.g. s3://my-bucket/exported_model/) (required) ... + dir: # S3 path to a directory containing multiple models (e.g. s3://my-bucket/models/) (either this, 'path', or 'paths' must be provided) cache_size: # the number models to keep in memory (optional; all models are kept in memory by default) disk_cache_size: # the number of models to keep on disk (optional; all models are kept on disk by default) processes_per_replica: # the number of parallel serving processes to run on each replica (default: 1) diff --git a/docs/workloads/realtime/models.md b/docs/workloads/realtime/models.md index 5ff78ba062..b9108d9f2e 100644 --- a/docs/workloads/realtime/models.md +++ b/docs/workloads/realtime/models.md @@ -102,7 +102,7 @@ or for a versioned model: ## Single model -The most common pattern is to serve a single model per API. The path to the model is specified in the `model_path` field in the `predictor` configuration. For example: +The most common pattern is to serve a single model per API. The path to the model is specified in the `path` field in the `predictor.models` configuration. For example: ```yaml # cortex.yaml @@ -111,10 +111,11 @@ The most common pattern is to serve a single model per API. The path to the mode kind: RealtimeAPI predictor: # ... - model_path: s3://my-bucket/models/text-generator/ + models: + path: s3://my-bucket/models/text-generator/ ``` -Note: for the Python predictor type, it is not necessary to specify the path to your model in `model_path`, since you can download and load it in your predictor's `__init__()` function. That said, it is necessary to use the `model_path` field to take advantage of [live model reloading](#live-model-reloading). +For the Python predictor type, the `models` field comes under the name of `multi_model_reloading`. It is also not necessary to specify the `multi_model_reloading` section at all, since you can download and load the model in your predictor's `__init__()` function. That said, it is necessary to use the `path` field to take advantage of [live model reloading](#live-model-reloading). ## Multiple models @@ -147,7 +148,8 @@ or: dir: s3://my-bucket/models/ ``` -Note: for the Python predictor type, it is not necessary to specify the paths to your models in `models`, since you can download and load them in your predictor's `__init__()` function. That said, it is necessary to use the `models` field to take advantage of live reloading or multi model caching (see below). + +For the Python predictor type, the `models` field comes under the name of `multi_model_reloading`. It is also not necessary to specify the `multi_model_reloading` section at all, since you can download and load the model in your predictor's `__init__()` function. That said, it is necessary to use the `models` field to take advantage of [live model reloading](#live-model-reloading) or [multi model caching](#multi-model-caching). When using the `models.paths` field, each path must be a valid model directory (see above for valid model directory structures). @@ -168,7 +170,7 @@ In this case, there are two models in the directory, one of which is named "text ## Live model reloading -Live model reloading is a mechanism that periodically checks for updated models in the model path(s) provided in `predictor.model_path` or `predictor.models`. It is automatically enabled for all predictor types, including the Python predictor type (as long as model paths are specified via `model_path` or `models` in the `predictor` configuration). +Live model reloading is a mechanism that periodically checks for updated models in the model path(s) provided in `predictor.models`. It is automatically enabled for all predictor types, including the Python predictor type (as long as model paths are specified via `multi_model_reloading` in the `predictor` configuration). The following is a list of events that will trigger the API to update its model(s): @@ -181,7 +183,7 @@ Usage varies based on the predictor type: ### Python -To use live model reloading with the Python predictor, the model path(s) must be specified in the API's `predictor` configuration (via the `model_path` or `models` field). When models are specified in this manner, your `PythonPredictor` class must implement the `load_model()` function, and models can be retrieved by using the `get_model()` method of the `python_client` that's passed into your predictor's constructor. +To use live model reloading with the Python predictor, the model path(s) must be specified in the API's `predictor` configuration, via the `models` field. When models are specified in this manner, your `PythonPredictor` class must implement the `load_model()` function, and models can be retrieved by using the `get_model()` method of the `python_client` that's passed into your predictor's constructor. The `load_model()` function that you implement in your `PythonPredictor` can return anything that you need to make a prediction. There is one caveat: whatever the return value is, it must be unloadable from memory via the `del` keyword. The following frameworks have been tested to work: diff --git a/docs/workloads/realtime/predictors.md b/docs/workloads/realtime/predictors.md index 555c33de51..dfdc59bf90 100644 --- a/docs/workloads/realtime/predictors.md +++ b/docs/workloads/realtime/predictors.md @@ -59,8 +59,7 @@ class PythonPredictor: the model and/or metadata. python_client (optional): Python client which is used to retrieve models for prediction. This should be saved for use in predict(). - Required when `predictor.model_path` or `predictor.models` is - specified in the api configuration. + Required when `predictor.multi_model_reloading` is specified in the api configuration. """ self.client = python_client # optional @@ -103,7 +102,7 @@ class PythonPredictor: def load_model(self, model_path): """(Optional) Called by Cortex to load a model when necessary. - This method is required when `predictor.model_path` or `predictor.models` + This method is required when `predictor.multi_model_reloading` field is specified in the api configuration. Warning: this method must not make any modification to the model's diff --git a/pkg/cortex/serve/cortex_internal/lib/api/predictor.py b/pkg/cortex/serve/cortex_internal/lib/api/predictor.py index 89446eed17..6a148026c7 100644 --- a/pkg/cortex/serve/cortex_internal/lib/api/predictor.py +++ b/pkg/cortex/serve/cortex_internal/lib/api/predictor.py @@ -98,11 +98,19 @@ def __init__(self, provider: str, api_spec: dict, model_dir: str): # model side-reloading is supported for any number of processes_per_replica if self.caching_enabled: + if self.type == PythonPredictorType: + mem_cache_size = self.api_spec["predictor"]["multi_model_reloading"]["cache_size"] + disk_cache_size = self.api_spec["predictor"]["multi_model_reloading"][ + "disk_cache_size" + ] + else: + mem_cache_size = self.api_spec["predictor"]["models"]["cache_size"] + disk_cache_size = self.api_spec["predictor"]["models"]["disk_cache_size"] self.models = ModelsHolder( self.type, self.model_dir, - mem_cache_size=self.api_spec["predictor"]["models"]["cache_size"], - disk_cache_size=self.api_spec["predictor"]["models"]["disk_cache_size"], + mem_cache_size=mem_cache_size, + disk_cache_size=disk_cache_size, on_download_callback=model_downloader, ) elif not self.caching_enabled and self.type not in [ @@ -289,15 +297,15 @@ def _get_class_impl(self, module_name, impl_path, target_class_name): def _is_model_caching_enabled(self) -> bool: """ - Checks if model caching is enabled (models:cache_size and models:disk_cache_size). + Checks if model caching is enabled. """ - if ( - self.api_spec["predictor"]["models"] - and self.api_spec["predictor"]["models"]["cache_size"] is not None - and self.api_spec["predictor"]["models"]["disk_cache_size"] is not None - ): - return True - return False + models = None + if self.type != PythonPredictorType and self.api_spec["predictor"]["models"]: + models = self.api_spec["predictor"]["models"] + if self.type == PythonPredictorType and self.api_spec["predictor"]["multi_model_reloading"]: + models = self.api_spec["predictor"]["multi_model_reloading"] + + return models and models["cache_size"] and models["disk_cache_size"] def __del__(self) -> None: for cron in self.crons: @@ -313,15 +321,16 @@ def _are_models_specified(api_spec: dict) -> bool: Args: api_spec: API configuration. """ - if api_spec["predictor"]["model_path"] is not None: - return True + predictor_type = predictor_type_from_api_spec(api_spec) - if api_spec["predictor"]["models"] and ( - api_spec["predictor"]["models"]["dir"] is not None - or len(api_spec["predictor"]["models"]["paths"]) > 0 - ): - return True - return False + if predictor_type == PythonPredictorType and api_spec["predictor"]["multi_model_reloading"]: + models = api_spec["predictor"]["multi_model_reloading"] + elif predictor_type != PythonPredictorType: + models = api_spec["predictor"]["models"] + else: + return False + + return models is not None PYTHON_CLASS_VALIDATION = { diff --git a/pkg/cortex/serve/cortex_internal/lib/client/onnx.py b/pkg/cortex/serve/cortex_internal/lib/client/onnx.py index a0268cd5d3..f08cadcbe4 100644 --- a/pkg/cortex/serve/cortex_internal/lib/client/onnx.py +++ b/pkg/cortex/serve/cortex_internal/lib/client/onnx.py @@ -119,10 +119,10 @@ def _validate_model_args( "model_version must be either a parse-able numeric value or 'latest'" ) - # when predictor:model_path or predictor:models:paths is specified + # when predictor:models:path or predictor:models:paths is specified if not self._models_dir: - # when predictor:model_path is provided + # when when predictor:models:path is provided if consts.SINGLE_MODEL_NAME in self._spec_model_names: return consts.SINGLE_MODEL_NAME, model_version diff --git a/pkg/cortex/serve/cortex_internal/lib/client/python.py b/pkg/cortex/serve/cortex_internal/lib/client/python.py index 71931227d6..e48bd894e5 100644 --- a/pkg/cortex/serve/cortex_internal/lib/client/python.py +++ b/pkg/cortex/serve/cortex_internal/lib/client/python.py @@ -71,8 +71,8 @@ def __init__( self._spec_models = get_models_from_api_spec(api_spec) if ( - self._api_spec["predictor"]["models"] - and self._api_spec["predictor"]["models"]["dir"] is not None + self._api_spec["predictor"]["multi_model_reloading"] + and self._api_spec["predictor"]["multi_model_reloading"]["dir"] ): self._models_dir = True else: @@ -111,10 +111,10 @@ def get_model(self, model_name: Optional[str] = None, model_version: str = "late "model_version must be either a parse-able numeric value or 'latest'" ) - # when predictor:model_path or predictor:models:paths is specified + # when predictor:models:path or predictor:models:paths is specified if not self._models_dir: - # when predictor:model_path is provided + # when predictor:models:path is provided if consts.SINGLE_MODEL_NAME in self._spec_model_names: model_name = consts.SINGLE_MODEL_NAME model = self._get_model(model_name, model_version) @@ -381,9 +381,9 @@ def _is_model_caching_enabled(self) -> bool: Checks if model caching is enabled (models:cache_size and models:disk_cache_size). """ return ( - self._api_spec["predictor"]["models"] - and self._api_spec["predictor"]["models"]["cache_size"] is not None - and self._api_spec["predictor"]["models"]["disk_cache_size"] is not None + self._api_spec["predictor"]["multi_model_reloading"] + and self._api_spec["predictor"]["multi_model_reloading"]["cache_size"] + and self._api_spec["predictor"]["multi_model_reloading"]["disk_cache_size"] ) @property diff --git a/pkg/cortex/serve/cortex_internal/lib/client/tensorflow.py b/pkg/cortex/serve/cortex_internal/lib/client/tensorflow.py index 1393b85781..fb3539f0c4 100644 --- a/pkg/cortex/serve/cortex_internal/lib/client/tensorflow.py +++ b/pkg/cortex/serve/cortex_internal/lib/client/tensorflow.py @@ -106,10 +106,10 @@ def predict( "model_version must be either a parse-able numeric value or 'latest'" ) - # when predictor:model_path or predictor:models:paths is specified + # when ppredictor:models:path or predictor:models:paths is specified if not self._models_dir: - # when predictor:model_path is provided + # when predictor:models:path is provided if consts.SINGLE_MODEL_NAME in self._spec_model_names: return self._run_inference(model_input, consts.SINGLE_MODEL_NAME, model_version) diff --git a/pkg/cortex/serve/cortex_internal/lib/model/cron.py b/pkg/cortex/serve/cortex_internal/lib/model/cron.py index 06d24e188c..40e204a453 100644 --- a/pkg/cortex/serve/cortex_internal/lib/model/cron.py +++ b/pkg/cortex/serve/cortex_internal/lib/model/cron.py @@ -142,21 +142,30 @@ def __init__( self._local_model_names = self._spec_models.get_local_model_names() self._cloud_model_names = self._spec_models.get_cloud_model_names() for model_name in self._cloud_model_names: - self._s3_paths.append(self._spec_models[model_name]["model_path"]) + self._s3_paths.append(self._spec_models[model_name]["path"]) + + self._predictor_type = predictor_type_from_api_spec(self._api_spec) if ( - self._api_spec["predictor"]["model_path"] is None - and self._api_spec["predictor"]["models"] is not None - and self._api_spec["predictor"]["models"]["dir"] is not None + self._predictor_type == PythonPredictorType + and self._api_spec["predictor"]["multi_model_reloading"] ): + models = self._api_spec["predictor"]["multi_model_reloading"] + elif self._predictor_type != PythonPredictorType: + models = self._api_spec["predictor"]["models"] + else: + models = None + + if models is None: + raise CortexException("no specified model") + + if models["dir"] is not None: self._is_dir_used = True - self._models_dir = self._api_spec["predictor"]["models"]["dir"] + self._models_dir = models["dir"] else: self._is_dir_used = False self._models_dir = None - self._predictor_type = predictor_type_from_api_spec(self._api_spec) - try: os.mkdir(self._lock_dir) except FileExistsError: @@ -209,7 +218,7 @@ def ran_once(self) -> bool: def _make_local_models_available(self) -> None: """ - Make local models (provided through predictor:model_path, models:paths or models:dir) available on disk. + Make local models (provided through models:path, models:paths or models:dir fields) available on disk. """ timestamp_utc = datetime.datetime.now(datetime.timezone.utc).timestamp() @@ -243,7 +252,7 @@ def _make_local_models_available(self) -> None: if idx + 1 < len(self._local_model_names): message += ", " else: - message += "now available on disk" + message += " now available on disk" logger().info(message) @@ -647,7 +656,7 @@ def find_ondisk_model_info(lock_dir: str, model_name: str) -> Tuple[List[str], L Args: lock_dir: Path to where the resource locks are stored. - model_name: Name of the model as specified in predictor:models:paths:name, _cortex_default when predictor:model_path is set or the discovered model names when predictor:models:dir is used. + model_name: Name of the model as specified in predictor:models:paths:name, _cortex_default when predictor:models:path is set or the discovered model names when predictor:models:dir is used. Returns: 2-element tuple made of a list with the available versions and a list with the corresponding timestamps for each model. Empty when the model is not available. @@ -728,11 +737,10 @@ def __init__( self._local_model_names = self._spec_models.get_local_model_names() self._cloud_model_names = self._spec_models.get_cloud_model_names() for model_name in self._cloud_model_names: - self._cloud_paths.append(self._spec_models[model_name]["model_path"]) + self._cloud_paths.append(self._spec_models[model_name]["path"]) if ( - self._api_spec["predictor"]["model_path"] is None - and self._api_spec["predictor"]["models"] is not None + self._api_spec["predictor"]["models"] is not None and self._api_spec["predictor"]["models"]["dir"] is not None ): self._is_dir_used = True @@ -1529,21 +1537,30 @@ def __init__(self, interval: int, api_spec: dict, tree: ModelsTree, ondisk_model self._spec_models = get_models_from_api_spec(self._api_spec) self._cloud_model_names = self._spec_models.get_cloud_model_names() for model_name in self._cloud_model_names: - self._cloud_paths.append(self._spec_models[model_name]["model_path"]) + self._cloud_paths.append(self._spec_models[model_name]["path"]) + + self._predictor_type = predictor_type_from_api_spec(self._api_spec) if ( - self._api_spec["predictor"]["model_path"] is None - and self._api_spec["predictor"]["models"] is not None - and self._api_spec["predictor"]["models"]["dir"] is not None + self._predictor_type == PythonPredictorType + and self._api_spec["predictor"]["multi_model_reloading"] ): + models = self._api_spec["predictor"]["multi_model_reloading"] + elif self._predictor_type != PythonPredictorType: + models = self._api_spec["predictor"]["models"] + else: + models = None + + if models is None: + raise CortexException("no specified model") + + if models and models["dir"] is not None: self._is_dir_used = True - self._models_dir = self._api_spec["predictor"]["models"]["dir"] + self._models_dir = models["dir"] else: self._is_dir_used = False self._models_dir = None - self._predictor_type = predictor_type_from_api_spec(self._api_spec) - self._make_local_models_available() def _make_local_models_available(self): diff --git a/pkg/cortex/serve/cortex_internal/lib/model/tree.py b/pkg/cortex/serve/cortex_internal/lib/model/tree.py index 1b5486bdf2..904869a102 100644 --- a/pkg/cortex/serve/cortex_internal/lib/model/tree.py +++ b/pkg/cortex/serve/cortex_internal/lib/model/tree.py @@ -424,8 +424,8 @@ def find_all_cloud_models( is_dir_used: Whether predictor:models:dir is used or not. models_dir: The value of predictor:models:dir in case it's present. Ignored when not required. predictor_type: The predictor type. - cloud_paths: The cloud model paths as they are specified in predictor:model_path/predictor:models:dir/predictor:models:paths is used. Ignored when not required. - cloud_model_names: The cloud model names as they are specified in predictor:models:paths:name when predictor:models:paths is used or the default name of the model when predictor:model_path is used. Ignored when not required. + cloud_paths: The cloud model paths as they are specified in predictor:models:path/predictor:models:paths/predictor:models:dir is used. Ignored when not required. + cloud_model_names: The cloud model names as they are specified in predictor:models:paths:name when predictor:models:paths is used or the default name of the model when predictor:models:path is used. Ignored when not required. Returns: The tuple with the following elements: model_names - a list with the names of the models (i.e. bert, gpt-2, etc) and they are unique diff --git a/pkg/cortex/serve/cortex_internal/lib/model/type.py b/pkg/cortex/serve/cortex_internal/lib/model/type.py index 75a43ab0d0..d2164767ea 100644 --- a/pkg/cortex/serve/cortex_internal/lib/model/type.py +++ b/pkg/cortex/serve/cortex_internal/lib/model/type.py @@ -17,7 +17,7 @@ import cortex_internal.consts from cortex_internal.lib.model import find_all_cloud_models -from cortex_internal.lib.type import predictor_type_from_api_spec +from cortex_internal.lib.type import predictor_type_from_api_spec, PythonPredictorType class CuratedModelResources: @@ -26,7 +26,7 @@ def __init__(self, curated_model_resources: List[dict]): An example of curated_model_resources object: [ { - 'model_path': 's3://cortex-examples/models/tensorflow/transformer/', + 'path': 's3://cortex-examples/models/tensorflow/transformer/', 'name': 'modelB', 's3_path': True, 'gs_path': False, @@ -77,7 +77,7 @@ def get_versions_for(self, name: str) -> Optional[List[str]]: Get versions for a given model name. Args: - name: Name of the model (_cortex_default for predictor:model_path) or predictor:models:paths:name. + name: Name of the model (_cortex_default for predictor:models:path) or predictor:models:paths:name. Returns: Versions for a given model. None if the model wasn't found. @@ -96,7 +96,7 @@ def get_versions_for(self, name: str) -> Optional[List[str]]: def get_local_model_names(self) -> List[str]: """ - Get locally-provided models as specified with predictor:model_path, predictor:models:paths or predictor:models:dir. + Get locally-provided models as specified with predictor:models:path, predictor:models:paths or predictor:models:dir. Returns: A list of names of all local models. @@ -110,7 +110,7 @@ def get_local_model_names(self) -> List[str]: def get_cloud_model_names(self) -> List[str]: """ - Get cloud-provided models as specified with predictor:model_path or predictor:models:paths. + Get cloud-provided models as specified with predictor:models:path or predictor:models:paths. Returns: A list of names of all models available from the cloud bucket(s). @@ -147,74 +147,78 @@ def get_models_from_api_spec( api_spec: dict, model_dir: str = "/mnt/model" ) -> CuratedModelResources: """ - Only effective for predictor:model_path, predictor:models:paths or for predictor:models:dir when the dir is a local path. - It does not apply for when predictor:models:dir is set to an S3 model path. + Only effective for models:path, models:paths or for models:dir fields when the dir is a local path. + It does not apply for when models:dir field is set to an S3 model path. """ + predictor_type = predictor_type_from_api_spec(api_spec) - predictor = api_spec["predictor"] - - if not predictor["model_path"] and not predictor["models"]: + if predictor_type == PythonPredictorType and api_spec["predictor"]["multi_model_reloading"]: + models_spec = api_spec["predictor"]["multi_model_reloading"] + elif predictor_type != PythonPredictorType: + models_spec = api_spec["predictor"]["models"] + else: return CuratedModelResources([]) - predictor_type = predictor_type_from_api_spec(api_spec) + if not models_spec["path"] and len(models_spec["paths"]) == 0: + return CuratedModelResources([]) - # for predictor.model_path + # for models.path models = [] - if predictor["model_path"]: + if models_spec["path"]: model = { "name": cortex_internal.consts.SINGLE_MODEL_NAME, - "model_path": predictor["model_path"], - "signature_key": predictor["signature_key"], + "path": models_spec["path"], + "signature_key": models_spec["signature_key"], } models.append(model) - # for predictor.models.paths - if predictor["models"] and predictor["models"]["paths"]: - for model in predictor["models"]["paths"]: + # for models.paths + if models_spec["paths"]: + for model in models_spec["paths"]: models.append( { "name": model["name"], - "model_path": model["model_path"], + "path": model["path"], "signature_key": model["signature_key"], } ) - # building model resources for predictor.model_path or predictor.models.paths + # building model resources for models.path or models.paths model_resources = [] for model in models: model_resource = {} model_resource["name"] = model["name"] - model_resource["s3_path"] = model["model_path"].startswith("s3://") - model_resource["gcs_path"] = model["model_path"].startswith("gs://") + model_resource["s3_path"] = model["path"].startswith("s3://") + model_resource["gcs_path"] = model["path"].startswith("gs://") model_resource["local_path"] = ( not model_resource["s3_path"] and not model_resource["gcs_path"] ) - if not model["signature_key"] and predictor["models"]: - model_resource["signature_key"] = predictor["models"]["signature_key"] + if not model["signature_key"]: + model_resource["signature_key"] = models_spec["signature_key"] else: model_resource["signature_key"] = model["signature_key"] if model_resource["s3_path"] or model_resource["gcs_path"]: - model_resource["model_path"] = model["model_path"] + model_resource["path"] = model["path"] _, versions, _, _, _, _, _ = find_all_cloud_models( - False, "", predictor_type, [model_resource["model_path"]], [model_resource["name"]] + False, "", predictor_type, [model_resource["path"]], [model_resource["name"]] ) if model_resource["name"] not in versions: continue model_resource["versions"] = versions[model_resource["name"]] else: - model_resource["model_path"] = os.path.join(model_dir, model_resource["name"]) - model_resource["versions"] = os.listdir(model_resource["model_path"]) + model_resource["path"] = os.path.join(model_dir, model_resource["name"]) + model_resource["versions"] = os.listdir(model_resource["path"]) model_resources.append(model_resource) - # building model resources for predictor.models.dir + # building model resources for models.dir if ( - predictor["models"] - and predictor["models"]["dir"] - and not predictor["models"]["dir"].startswith("s3://") - and not predictor["models"]["dir"].startswith("gs://") + models_spec + and models_spec["dir"] + and not models_spec["dir"].startswith("s3://") + and not models_spec["dir"].startswith("gs://") ): for model_name in os.listdir(model_dir): model_resource = {} @@ -222,9 +226,9 @@ def get_models_from_api_spec( model_resource["s3_path"] = False model_resource["gcs_path"] = False model_resource["local_path"] = True - model_resource["signature_key"] = predictor["models"]["signature_key"] - model_resource["model_path"] = os.path.join(model_dir, model_name) - model_resource["versions"] = os.listdir(model_resource["model_path"]) + model_resource["signature_key"] = models_spec["signature_key"] + model_resource["path"] = os.path.join(model_dir, model_name) + model_resource["versions"] = os.listdir(model_resource["path"]) model_resources.append(model_resource) return CuratedModelResources(model_resources) diff --git a/pkg/cortex/serve/cortex_internal/lib/model/validation.py b/pkg/cortex/serve/cortex_internal/lib/model/validation.py index 18cf2cf193..607262cc17 100644 --- a/pkg/cortex/serve/cortex_internal/lib/model/validation.py +++ b/pkg/cortex/serve/cortex_internal/lib/model/validation.py @@ -157,7 +157,7 @@ class ModelVersion(IntEnum): PROVIDED = 2 # for models provided with version directories (1, 2, 452, etc). -# to be used when predictor:model_path, predictor:models:paths or predictor:models:dir is used +# to be used when predictor:models:path, predictor:models:paths or predictor:models:dir is used ModelTemplate = { PythonPredictorType: { OneOfAllPlaceholder(ModelVersion.PROVIDED): { @@ -240,7 +240,7 @@ def json_model_template_representation(model_template) -> dict: def _single_model_pattern(predictor_type: PredictorType) -> dict: """ - To be used when predictor:model_path or predictor:models:paths in cortex.yaml is used. + To be used when predictor:models:path or predictor:models:paths in cortex.yaml is used. """ return ModelTemplate[predictor_type] @@ -289,7 +289,7 @@ def validate_model_paths( paths: List[str], predictor_type: PredictorType, common_prefix: str ) -> List[int]: """ - To be used when predictor:model_path or predictor:models:paths in cortex.yaml is used. + To be used when predictor:models:path or predictor:models:paths in cortex.yaml is used. Args: paths: A list of all paths for a given cloud/local prefix. Must be the top directory of a model. diff --git a/pkg/cortex/serve/init/script.py b/pkg/cortex/serve/init/script.py index 110d7ec67e..ec031574e7 100644 --- a/pkg/cortex/serve/init/script.py +++ b/pkg/cortex/serve/init/script.py @@ -72,23 +72,29 @@ def are_models_specified(api_spec: dict) -> bool: Args: api_spec: API configuration. """ - if api_spec["predictor"]["model_path"] is not None: - return True + predictor_type = predictor_type_from_api_spec(api_spec) + + if predictor_type == PythonPredictorType and api_spec["predictor"]["multi_model_reloading"]: + models = api_spec["predictor"]["multi_model_reloading"] + elif predictor_type != PythonPredictorType: + models = api_spec["predictor"]["models"] + else: + return False - if api_spec["predictor"]["models"] and ( - api_spec["predictor"]["models"]["dir"] is not None - or len(api_spec["predictor"]["models"]["paths"]) > 0 - ): - return True - return False + return models is not None def is_model_caching_enabled(api_spec: dir) -> bool: - return ( - api_spec["predictor"]["models"] - and api_spec["predictor"]["models"]["cache_size"] is not None - and api_spec["predictor"]["models"]["disk_cache_size"] is not None - ) + predictor_type = predictor_type_from_api_spec(api_spec) + + if predictor_type == PythonPredictorType and api_spec["predictor"]["multi_model_reloading"]: + models = api_spec["predictor"]["multi_model_reloading"] + elif predictor_type != PythonPredictorType: + models = api_spec["predictor"]["models"] + else: + return False + + return models and models["cache_size"] and models["disk_cache_size"] def main(): diff --git a/pkg/operator/operator/k8s.go b/pkg/operator/operator/k8s.go index 5a44f26c8b..6d3e4774dc 100644 --- a/pkg/operator/operator/k8s.go +++ b/pkg/operator/operator/k8s.go @@ -410,7 +410,7 @@ func getEnvVars(api *spec.API, container string) []kcore.EnvVar { ) } - if api.Predictor.ModelPath != nil || api.Predictor.Models != nil { + if api.Predictor.Type != userconfig.PythonPredictorType || api.Predictor.MultiModelReloading != nil { envVars = append(envVars, kcore.EnvVar{ Name: "CORTEX_MODEL_DIR", diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index 730676367e..94caec0966 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -37,6 +37,7 @@ const ( ErrDuplicateEndpointInOneDeploy = "spec.duplicate_endpoint_in_one_deploy" ErrDuplicateEndpoint = "spec.duplicate_endpoint" ErrConflictingFields = "spec.conflicting_fields" + ErrSpecifyOnlyOneField = "spec.specify_only_one_field" ErrSpecifyOneOrTheOther = "spec.specify_one_or_the_other" ErrSpecifyAllOrNone = "spec.specify_all_or_none" ErrOneOfPrerequisitesNotDefined = "spec.one_of_prerequisites_not_defined" @@ -65,7 +66,6 @@ const ( ErrInvalidTensorFlowModelPath = "spec.invalid_tensorflow_model_path" ErrInvalidONNXModelPath = "spec.invalid_onnx_model_path" - ErrMissingModel = "spec.missing_model" ErrDuplicateModelNames = "spec.duplicate_model_names" ErrReservedModelName = "spec.reserved_model_name" @@ -148,6 +148,13 @@ func ErrorConflictingFields(fieldKeyA, fieldKeyB string) error { }) } +func ErrorSpecifyOnlyOneField(fields ...string) error { + return errors.WithStack(&errors.Error{ + Kind: ErrSpecifyOnlyOneField, + Message: fmt.Sprintf("please specify only one of the following fields %s", s.UserStrsOr(fields)), + }) +} + func ErrorSpecifyOneOrTheOther(fieldKeyA, fieldKeyB string) error { return errors.WithStack(&errors.Error{ Kind: ErrSpecifyOneOrTheOther, @@ -431,13 +438,6 @@ func ErrorInvalidONNXModelPath(modelPath string, modelSubPaths []string) error { }) } -func ErrorMissingModel(predictorType userconfig.PredictorType) error { - return errors.WithStack(&errors.Error{ - Kind: ErrMissingModel, - Message: fmt.Sprintf("at least one model must be specified for the %s predictor type; use fields %s.%s or %s.%s to add model(s)", predictorType, userconfig.PredictorKey, userconfig.ModelPathKey, userconfig.PredictorKey, userconfig.ModelsKey), - }) -} - func ErrorDuplicateModelNames(duplicateModel string) error { return errors.WithStack(&errors.Error{ Kind: ErrDuplicateModelNames, diff --git a/pkg/types/spec/utils.go b/pkg/types/spec/utils.go index a478070cc1..3eaa1b5379 100644 --- a/pkg/types/spec/utils.go +++ b/pkg/types/spec/utils.go @@ -85,19 +85,31 @@ func surgeOrUnavailableValidator(str string) (string, error) { return str, nil } -func getErrorForPredictorType(api userconfig.API, modelPrefix string, modelPaths []string) error { - switch api.Predictor.Type { - case userconfig.PythonPredictorType: - return ErrorInvalidPythonModelPath(modelPrefix, modelPaths) - case userconfig.ONNXPredictorType: - return ErrorInvalidONNXModelPath(modelPrefix, modelPaths) - case userconfig.TensorFlowPredictorType: - return ErrorInvalidTensorFlowModelPath(modelPrefix, api.Compute.Inf > 0, modelPaths) +type errorForPredictorTypeFn func(string, []string) error + +func generateErrorForPredictorTypeFn(api *userconfig.API) errorForPredictorTypeFn { + return func(modelPrefix string, modelPaths []string) error { + switch api.Predictor.Type { + case userconfig.PythonPredictorType: + return ErrorInvalidPythonModelPath(modelPrefix, modelPaths) + case userconfig.ONNXPredictorType: + return ErrorInvalidONNXModelPath(modelPrefix, modelPaths) + case userconfig.TensorFlowPredictorType: + return ErrorInvalidTensorFlowModelPath(modelPrefix, api.Compute.Inf > 0, modelPaths) + } + return nil } - return nil } -func validateDirModels(modelPath string, api userconfig.API, projectDir string, awsClient *aws.Client, gcpClient *gcp.Client, extraValidators []modelValidator) ([]CuratedModelResource, error) { +func validateDirModels( + modelPath string, + signatureKey *string, + projectDir string, + awsClient *aws.Client, + gcpClient *gcp.Client, + errorForPredictorType errorForPredictorTypeFn, + extraValidators []modelValidator) ([]CuratedModelResource, error) { + var bucket string var dirPrefix string var modelDirPaths []string @@ -152,7 +164,7 @@ func validateDirModels(modelPath string, api userconfig.API, projectDir string, } } if len(modelDirPaths) == 0 { - return nil, getErrorForPredictorType(api, dirPrefix, modelDirPaths) + return nil, errorForPredictorType(dirPrefix, modelDirPaths) } modelNames := []string{} @@ -170,7 +182,7 @@ func validateDirModels(modelPath string, api userconfig.API, projectDir string, modelStructureType := determineBaseModelStructure(modelDirPaths, modelPrefix) if modelStructureType == userconfig.UnknownModelStructureType { - return nil, errors.Wrap(getErrorForPredictorType(api, modelPrefix, nil), modelName) + return nil, errors.Wrap(errorForPredictorType(modelPrefix, nil), modelName) } var versions []string @@ -215,8 +227,8 @@ func validateDirModels(modelPath string, api userconfig.API, projectDir string, modelResources[i] = CuratedModelResource{ ModelResource: &userconfig.ModelResource{ Name: modelName, - ModelPath: fullModelPath, - SignatureKey: api.Predictor.SignatureKey, + Path: fullModelPath, + SignatureKey: signatureKey, }, S3Path: s3Path, GCSPath: gcsPath, @@ -228,7 +240,15 @@ func validateDirModels(modelPath string, api userconfig.API, projectDir string, return modelResources, nil } -func validateModels(models []userconfig.ModelResource, api userconfig.API, projectDir string, awsClient *aws.Client, gcpClient *gcp.Client, extraValidators []modelValidator) ([]CuratedModelResource, error) { +func validateModels( + models []userconfig.ModelResource, + defaultSignatureKey *string, + projectDir string, + awsClient *aws.Client, + gcpClient *gcp.Client, + errorForPredictorType errorForPredictorTypeFn, + extraValidators []modelValidator) ([]CuratedModelResource, error) { + var bucket string var modelPrefix string var modelPaths []string @@ -236,19 +256,19 @@ func validateModels(models []userconfig.ModelResource, api userconfig.API, proje modelResources := make([]CuratedModelResource, len(models)) for i, model := range models { - modelPath := s.EnsureSuffix(model.ModelPath, "/") + modelPath := s.EnsureSuffix(model.Path, "/") - s3Path := strings.HasPrefix(model.ModelPath, "s3://") - gcsPath := strings.HasPrefix(model.ModelPath, "gs://") + s3Path := strings.HasPrefix(model.Path, "s3://") + gcsPath := strings.HasPrefix(model.Path, "gs://") localPath := !s3Path && !gcsPath if s3Path { - awsClientForBucket, err := aws.NewFromClientS3Path(model.ModelPath, awsClient) + awsClientForBucket, err := aws.NewFromClientS3Path(model.Path, awsClient) if err != nil { return nil, errors.Wrap(err, model.Name) } - bucket, modelPrefix, err = aws.SplitS3Path(model.ModelPath) + bucket, modelPrefix, err = aws.SplitS3Path(model.Path) if err != nil { return nil, errors.Wrap(err, model.Name) } @@ -262,7 +282,7 @@ func validateModels(models []userconfig.ModelResource, api userconfig.API, proje } if gcsPath { - bucket, modelPrefix, err = gcp.SplitGCSPath(model.ModelPath) + bucket, modelPrefix, err = gcp.SplitGCSPath(model.Path) if err != nil { return nil, errors.Wrap(err, model.Name) } @@ -275,7 +295,7 @@ func validateModels(models []userconfig.ModelResource, api userconfig.API, proje } if localPath { - expandedLocalPath := files.RelToAbsPath(model.ModelPath, projectDir) + expandedLocalPath := files.RelToAbsPath(model.Path, projectDir) modelPrefix = s.EnsureSuffix(expandedLocalPath, "/") err := files.CheckDir(modelPrefix) @@ -289,7 +309,7 @@ func validateModels(models []userconfig.ModelResource, api userconfig.API, proje } } if len(modelPaths) == 0 { - return nil, errors.Wrap(getErrorForPredictorType(api, modelPrefix, modelPaths), model.Name) + return nil, errors.Wrap(errorForPredictorType(modelPrefix, modelPaths), model.Name) } modelStructureType := determineBaseModelStructure(modelPaths, modelPrefix) @@ -328,8 +348,8 @@ func validateModels(models []userconfig.ModelResource, api userconfig.API, proje var signatureKey *string if model.SignatureKey != nil { signatureKey = model.SignatureKey - } else if api.Predictor.Models != nil && api.Predictor.Models.SignatureKey != nil { - signatureKey = api.Predictor.SignatureKey + } else if defaultSignatureKey != nil { + signatureKey = defaultSignatureKey } fullModelPath := "" @@ -346,7 +366,7 @@ func validateModels(models []userconfig.ModelResource, api userconfig.API, proje modelResources[i] = CuratedModelResource{ ModelResource: &userconfig.ModelResource{ Name: model.Name, - ModelPath: fullModelPath, + Path: fullModelPath, SignatureKey: signatureKey, }, S3Path: s3Path, diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 6b0da0e7d6..9e803e6e54 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -158,12 +158,6 @@ func predictorValidation() *cr.StructFieldValidation { Required: true, }, }, - { - StructField: "ModelPath", - StringPtrValidation: &cr.StringPtrValidation{ - Required: false, - }, - }, { StructField: "PythonPath", StringPtrValidation: &cr.StringPtrValidation{ @@ -227,11 +221,8 @@ func predictorValidation() *cr.StructFieldValidation { AllowEmpty: true, }, }, - { - StructField: "SignatureKey", - StringPtrValidation: &cr.StringPtrValidation{}, - }, - multiModelValidation(), + multiModelValidation("Models"), + multiModelValidation("MultiModelReloading"), serverSideBatchingValidation(), }, }, @@ -608,13 +599,19 @@ func updateStrategyValidation(provider types.ProviderType) *cr.StructFieldValida } } -func multiModelValidation() *cr.StructFieldValidation { +func multiModelValidation(fieldName string) *cr.StructFieldValidation { return &cr.StructFieldValidation{ - StructField: "Models", + StructField: fieldName, StructValidation: &cr.StructValidation{ Required: false, DefaultNil: true, StructFieldValidations: []*cr.StructFieldValidation{ + { + StructField: "Path", + StringPtrValidation: &cr.StringPtrValidation{ + Required: false, + }, + }, multiModelPathsValidation(), { StructField: "Dir", @@ -665,7 +662,7 @@ func multiModelPathsValidation() *cr.StructFieldValidation { }, }, { - StructField: "ModelPath", + StructField: "Path", StringValidation: &cr.StringValidation{ Required: true, AllowEmpty: false, @@ -858,13 +855,8 @@ func validatePredictor( ) error { predictor := api.Predictor - if predictor.Models != nil && predictor.ModelPath != nil { - return ErrorConflictingFields(userconfig.ModelPathKey, userconfig.ModelsKey) - } - if predictor.Models != nil { - if err := validateMultiModelsFields(api); err != nil { - return err - } + if err := validateMultiModelsFields(api); err != nil { + return err } switch predictor.Type { @@ -886,6 +878,10 @@ func validatePredictor( } if api.Kind == userconfig.BatchAPIKind { + if predictor.MultiModelReloading != nil { + return ErrorKeyIsNotSupportedForKind(userconfig.MultiModelReloadingKey, userconfig.BatchAPIKind) + } + if predictor.ServerSideBatching != nil { return ErrorKeyIsNotSupportedForKind(userconfig.ServerSideBatchingKey, userconfig.BatchAPIKind) } @@ -925,28 +921,59 @@ func validatePredictor( func validateMultiModelsFields(api *userconfig.API) error { predictor := api.Predictor - if len(predictor.Models.Paths) == 0 && predictor.Models.Dir == nil { - return errors.Wrap(ErrorSpecifyOneOrTheOther(userconfig.ModelsPathsKey, userconfig.ModelsDirKey), userconfig.ModelsKey) + var models *userconfig.MultiModels + if api.Predictor.Models != nil { + if api.Predictor.Type == userconfig.PythonPredictorType { + return ErrorFieldNotSupportedByPredictorType(userconfig.ModelsKey, api.Predictor.Type) + } + models = api.Predictor.Models + } + if api.Predictor.MultiModelReloading != nil { + if api.Predictor.Type != userconfig.PythonPredictorType { + return ErrorFieldNotSupportedByPredictorType(userconfig.MultiModelReloadingKey, api.Predictor.Type) + } + models = api.Predictor.MultiModelReloading + } + + if models == nil { + if api.Predictor.Type != userconfig.PythonPredictorType { + return ErrorFieldMustBeDefinedForPredictorType(userconfig.ModelsKey, api.Predictor.Type) + } + return nil + } + + if models.Path == nil && len(models.Paths) == 0 && models.Dir == nil { + return errors.Wrap(ErrorSpecifyOnlyOneField(userconfig.ModelsPathKey, userconfig.ModelsPathsKey, userconfig.ModelsDirKey), userconfig.ModelsKey) } - if len(predictor.Models.Paths) > 0 && predictor.Models.Dir != nil { + if models.Path != nil && len(models.Paths) > 0 && models.Dir != nil { + return errors.Wrap(ErrorSpecifyOnlyOneField(userconfig.ModelsPathKey, userconfig.ModelsPathsKey, userconfig.ModelsDirKey), userconfig.ModelsKey) + } + + if models.Path != nil && len(models.Paths) > 0 { + return errors.Wrap(ErrorConflictingFields(userconfig.ModelsPathKey, userconfig.ModelsPathsKey), userconfig.ModelsKey) + } + if models.Dir != nil && len(models.Paths) > 0 { return errors.Wrap(ErrorConflictingFields(userconfig.ModelsPathsKey, userconfig.ModelsDirKey), userconfig.ModelsKey) } + if models.Dir != nil && models.Path != nil { + return errors.Wrap(ErrorConflictingFields(userconfig.ModelsPathKey, userconfig.ModelsDirKey), userconfig.ModelsKey) + } - if predictor.Models.CacheSize != nil && api.Kind != userconfig.RealtimeAPIKind { + if models.CacheSize != nil && api.Kind != userconfig.RealtimeAPIKind { return errors.Wrap(ErrorKeyIsNotSupportedForKind(userconfig.ModelsCacheSizeKey, api.Kind), userconfig.ModelsKey) } - if predictor.Models.DiskCacheSize != nil && api.Kind != userconfig.RealtimeAPIKind { + if models.DiskCacheSize != nil && api.Kind != userconfig.RealtimeAPIKind { return errors.Wrap(ErrorKeyIsNotSupportedForKind(userconfig.ModelsDiskCacheSizeKey, api.Kind), userconfig.ModelsKey) } - if (predictor.Models.CacheSize == nil && predictor.Models.DiskCacheSize != nil) || - (predictor.Models.CacheSize != nil && predictor.Models.DiskCacheSize == nil) { + if (models.CacheSize == nil && models.DiskCacheSize != nil) || + (models.CacheSize != nil && models.DiskCacheSize == nil) { return errors.Wrap(ErrorSpecifyAllOrNone(userconfig.ModelsCacheSizeKey, userconfig.ModelsDiskCacheSizeKey), userconfig.ModelsKey) } - if predictor.Models.CacheSize != nil && predictor.Models.DiskCacheSize != nil { - if *predictor.Models.CacheSize > *predictor.Models.DiskCacheSize { - return errors.Wrap(ErrorConfigGreaterThanOtherConfig(userconfig.ModelsCacheSizeKey, *predictor.Models.CacheSize, userconfig.ModelsDiskCacheSizeKey, *predictor.Models.DiskCacheSize), userconfig.ModelsKey) + if models.CacheSize != nil && models.DiskCacheSize != nil { + if *models.CacheSize > *models.DiskCacheSize { + return errors.Wrap(ErrorConfigGreaterThanOtherConfig(userconfig.ModelsCacheSizeKey, *models.CacheSize, userconfig.ModelsDiskCacheSizeKey, *models.DiskCacheSize), userconfig.ModelsKey) } if predictor.ProcessesPerReplica > 1 { @@ -960,9 +987,10 @@ func validateMultiModelsFields(api *userconfig.API) error { func validatePythonPredictor(api *userconfig.API, models *[]CuratedModelResource, provider types.ProviderType, projectFiles ProjectFiles, awsClient *aws.Client, gcpClient *gcp.Client) error { predictor := api.Predictor - if predictor.SignatureKey != nil { - return ErrorFieldNotSupportedByPredictorType(userconfig.SignatureKeyKey, predictor.Type) + if predictor.Models != nil { + return ErrorFieldNotSupportedByPredictorType(userconfig.ModelsKey, predictor.Type) } + if predictor.ServerSideBatching != nil { if predictor.ServerSideBatching.MaxBatchSize != predictor.ThreadsPerProcess { return ErrorConcurrencyMismatchServerSideBatchingPython( @@ -975,60 +1003,69 @@ func validatePythonPredictor(api *userconfig.API, models *[]CuratedModelResource return ErrorFieldNotSupportedByPredictorType(userconfig.TensorFlowServingImageKey, predictor.Type) } - hasSingleModel := predictor.ModelPath != nil - hasMultiModels := predictor.Models != nil + if predictor.MultiModelReloading == nil { + return nil + } + mmr := predictor.MultiModelReloading + if mmr.SignatureKey != nil { + return errors.Wrap(ErrorFieldNotSupportedByPredictorType(userconfig.ModelsSignatureKeyKey, predictor.Type), userconfig.MultiModelReloadingKey) + } + + hasSingleModel := mmr.Path != nil + hasMultiModels := !hasSingleModel var modelWrapError func(error) error var modelResources []userconfig.ModelResource if hasSingleModel { + modelWrapError = func(err error) error { + return errors.Wrap(err, userconfig.MultiModelReloadingKey, userconfig.ModelsPathKey) + } modelResources = []userconfig.ModelResource{ { - Name: consts.SingleModelName, - ModelPath: *predictor.ModelPath, + Name: consts.SingleModelName, + Path: *mmr.Path, }, } - *predictor.ModelPath = s.EnsureSuffix(*predictor.ModelPath, "/") - modelWrapError = func(err error) error { - return errors.Wrap(err, userconfig.ModelPathKey) - } + *mmr.Path = s.EnsureSuffix(*mmr.Path, "/") } if hasMultiModels { - if predictor.Models.SignatureKey != nil { - return errors.Wrap(ErrorFieldNotSupportedByPredictorType(userconfig.SignatureKeyKey, predictor.Type), userconfig.ModelsKey) + if mmr.SignatureKey != nil { + return errors.Wrap(ErrorFieldNotSupportedByPredictorType(userconfig.ModelsSignatureKeyKey, predictor.Type), userconfig.MultiModelReloadingKey) } - if len(predictor.Models.Paths) > 0 { + if len(mmr.Paths) > 0 { modelWrapError = func(err error) error { - return errors.Wrap(err, userconfig.ModelsKey, userconfig.ModelsPathsKey) + return errors.Wrap(err, userconfig.MultiModelReloadingKey, userconfig.ModelsPathsKey) } - for _, path := range predictor.Models.Paths { + for _, path := range mmr.Paths { if path.SignatureKey != nil { return errors.Wrap( - ErrorFieldNotSupportedByPredictorType(userconfig.SignatureKeyKey, predictor.Type), + ErrorFieldNotSupportedByPredictorType(userconfig.ModelsSignatureKeyKey, predictor.Type), + userconfig.MultiModelReloadingKey, userconfig.ModelsKey, userconfig.ModelsPathsKey, path.Name, ) } - (*path).ModelPath = s.EnsureSuffix((*path).ModelPath, "/") + (*path).Path = s.EnsureSuffix((*path).Path, "/") modelResources = append(modelResources, *path) } } - if predictor.Models.Dir != nil { + if mmr.Dir != nil { modelWrapError = func(err error) error { - return errors.Wrap(err, userconfig.ModelsKey, userconfig.ModelsDirKey) + return errors.Wrap(err, userconfig.MultiModelReloadingKey, userconfig.ModelsDirKey) } } } var err error - if hasMultiModels && predictor.Models.Dir != nil { - *models, err = validateDirModels(*predictor.Models.Dir, *api, projectFiles.ProjectDir(), awsClient, gcpClient, nil) + if hasMultiModels && mmr.Dir != nil { + *models, err = validateDirModels(*mmr.Dir, nil, projectFiles.ProjectDir(), awsClient, gcpClient, generateErrorForPredictorTypeFn(api), nil) } else { - *models, err = validateModels(modelResources, *api, projectFiles.ProjectDir(), awsClient, gcpClient, nil) + *models, err = validateModels(modelResources, nil, projectFiles.ProjectDir(), awsClient, gcpClient, generateErrorForPredictorTypeFn(api), nil) } if err != nil { return modelWrapError(err) @@ -1061,28 +1098,28 @@ func validateTensorFlowPredictor(api *userconfig.API, models *[]CuratedModelReso } } - hasSingleModel := predictor.ModelPath != nil - hasMultiModels := predictor.Models != nil - - if !hasSingleModel && !hasMultiModels { - return ErrorMissingModel(predictor.Type) + if predictor.MultiModelReloading != nil { + return ErrorFieldNotSupportedByPredictorType(userconfig.MultiModelReloadingKey, userconfig.PythonPredictorType) } + hasSingleModel := predictor.Models.Path != nil + hasMultiModels := !hasSingleModel + var modelWrapError func(error) error var modelResources []userconfig.ModelResource if hasSingleModel { + modelWrapError = func(err error) error { + return errors.Wrap(err, userconfig.ModelsPathKey) + } modelResources = []userconfig.ModelResource{ { Name: consts.SingleModelName, - ModelPath: *predictor.ModelPath, - SignatureKey: predictor.SignatureKey, + Path: *predictor.Models.Path, + SignatureKey: predictor.Models.SignatureKey, }, } - *predictor.ModelPath = s.EnsureSuffix(*predictor.ModelPath, "/") - modelWrapError = func(err error) error { - return errors.Wrap(err, userconfig.ModelPathKey) - } + *predictor.Models.Path = s.EnsureSuffix(*predictor.Models.Path, "/") } if hasMultiModels { if len(predictor.Models.Paths) > 0 { @@ -1094,7 +1131,7 @@ func validateTensorFlowPredictor(api *userconfig.API, models *[]CuratedModelReso if path.SignatureKey == nil && predictor.Models.SignatureKey != nil { path.SignatureKey = predictor.Models.SignatureKey } - (*path).ModelPath = s.EnsureSuffix((*path).ModelPath, "/") + (*path).Path = s.EnsureSuffix((*path).Path, "/") modelResources = append(modelResources, *path) } } @@ -1115,9 +1152,9 @@ func validateTensorFlowPredictor(api *userconfig.API, models *[]CuratedModelReso var err error if hasMultiModels && predictor.Models.Dir != nil { - *models, err = validateDirModels(*predictor.Models.Dir, *api, projectFiles.ProjectDir(), awsClient, gcpClient, validators) + *models, err = validateDirModels(*predictor.Models.Dir, predictor.Models.SignatureKey, projectFiles.ProjectDir(), awsClient, gcpClient, generateErrorForPredictorTypeFn(api), validators) } else { - *models, err = validateModels(modelResources, *api, projectFiles.ProjectDir(), awsClient, gcpClient, validators) + *models, err = validateModels(modelResources, predictor.Models.SignatureKey, projectFiles.ProjectDir(), awsClient, gcpClient, generateErrorForPredictorTypeFn(api), validators) } if err != nil { return modelWrapError(err) @@ -1141,8 +1178,8 @@ func validateTensorFlowPredictor(api *userconfig.API, models *[]CuratedModelReso func validateONNXPredictor(api *userconfig.API, models *[]CuratedModelResource, provider types.ProviderType, projectFiles ProjectFiles, awsClient *aws.Client, gcpClient *gcp.Client) error { predictor := api.Predictor - if predictor.SignatureKey != nil { - return ErrorFieldNotSupportedByPredictorType(userconfig.SignatureKeyKey, predictor.Type) + if predictor.Models.SignatureKey != nil { + return errors.Wrap(ErrorFieldNotSupportedByPredictorType(userconfig.ModelsSignatureKeyKey, predictor.Type), userconfig.ModelsKey) } if predictor.ServerSideBatching != nil { return ErrorFieldNotSupportedByPredictorType(userconfig.ServerSideBatchingKey, predictor.Type) @@ -1151,33 +1188,29 @@ func validateONNXPredictor(api *userconfig.API, models *[]CuratedModelResource, return ErrorFieldNotSupportedByPredictorType(userconfig.TensorFlowServingImageKey, predictor.Type) } - hasSingleModel := predictor.ModelPath != nil - hasMultiModels := predictor.Models != nil - - if !hasSingleModel && !hasMultiModels { - return ErrorMissingModel(predictor.Type) + if predictor.MultiModelReloading != nil { + return ErrorFieldNotSupportedByPredictorType(userconfig.MultiModelReloadingKey, userconfig.PythonPredictorType) } + hasSingleModel := predictor.Models.Path != nil + hasMultiModels := !hasSingleModel + var modelWrapError func(error) error var modelResources []userconfig.ModelResource if hasSingleModel { + modelWrapError = func(err error) error { + return errors.Wrap(err, userconfig.ModelsPathKey) + } modelResources = []userconfig.ModelResource{ { - Name: consts.SingleModelName, - ModelPath: *predictor.ModelPath, + Name: consts.SingleModelName, + Path: *predictor.Models.Path, }, } - *predictor.ModelPath = s.EnsureSuffix(*predictor.ModelPath, "/") - modelWrapError = func(err error) error { - return errors.Wrap(err, userconfig.ModelPathKey) - } + *predictor.Models.Path = s.EnsureSuffix(*predictor.Models.Path, "/") } if hasMultiModels { - if predictor.Models.SignatureKey != nil { - return errors.Wrap(ErrorFieldNotSupportedByPredictorType(userconfig.SignatureKeyKey, predictor.Type), userconfig.ModelsKey) - } - if len(predictor.Models.Paths) > 0 { modelWrapError = func(err error) error { return errors.Wrap(err, userconfig.ModelsKey, userconfig.ModelsPathsKey) @@ -1186,13 +1219,13 @@ func validateONNXPredictor(api *userconfig.API, models *[]CuratedModelResource, for _, path := range predictor.Models.Paths { if path.SignatureKey != nil { return errors.Wrap( - ErrorFieldNotSupportedByPredictorType(userconfig.SignatureKeyKey, predictor.Type), + ErrorFieldNotSupportedByPredictorType(userconfig.ModelsSignatureKeyKey, predictor.Type), userconfig.ModelsKey, userconfig.ModelsPathsKey, path.Name, ) } - (*path).ModelPath = s.EnsureSuffix((*path).ModelPath, "/") + (*path).Path = s.EnsureSuffix((*path).Path, "/") modelResources = append(modelResources, *path) } } @@ -1208,9 +1241,9 @@ func validateONNXPredictor(api *userconfig.API, models *[]CuratedModelResource, var err error if hasMultiModels && predictor.Models.Dir != nil { - *models, err = validateDirModels(*predictor.Models.Dir, *api, projectFiles.ProjectDir(), awsClient, gcpClient, validators) + *models, err = validateDirModels(*predictor.Models.Dir, nil, projectFiles.ProjectDir(), awsClient, gcpClient, generateErrorForPredictorTypeFn(api), validators) } else { - *models, err = validateModels(modelResources, *api, projectFiles.ProjectDir(), awsClient, gcpClient, validators) + *models, err = validateModels(modelResources, nil, projectFiles.ProjectDir(), awsClient, gcpClient, generateErrorForPredictorTypeFn(api), validators) } if err != nil { return modelWrapError(err) diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index 63dd2bb0f4..9eb06dcaad 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -45,11 +45,12 @@ type API struct { } type Predictor struct { - Type PredictorType `json:"type" yaml:"type"` - Path string `json:"path" yaml:"path"` - ModelPath *string `json:"model_path" yaml:"model_path"` - SignatureKey *string `json:"signature_key" yaml:"signature_key"` - Models *MultiModels `json:"models" yaml:"models"` + Type PredictorType `json:"type" yaml:"type"` + Path string `json:"path" yaml:"path"` + + MultiModelReloading *MultiModels `json:"multi_model_reloading" yaml:"multi_model_reloading"` + Models *MultiModels `json:"models" yaml:"models"` + ServerSideBatching *ServerSideBatching `json:"server_side_batching" yaml:"server_side_batching"` ProcessesPerReplica int32 `json:"processes_per_replica" yaml:"processes_per_replica"` ThreadsPerProcess int32 `json:"threads_per_process" yaml:"threads_per_process"` @@ -61,6 +62,7 @@ type Predictor struct { } type MultiModels struct { + Path *string `json:"path" yaml:"path"` Paths []*ModelResource `json:"paths" yaml:"paths"` Dir *string `json:"dir" yaml:"dir"` CacheSize *int32 `json:"cache_size" yaml:"cache_size"` @@ -75,7 +77,7 @@ type TrafficSplit struct { type ModelResource struct { Name string `json:"name" yaml:"name"` - ModelPath string `json:"model_path" yaml:"model_path"` + Path string `json:"path" yaml:"path"` SignatureKey *string `json:"signature_key" yaml:"signature_key"` } @@ -364,15 +366,13 @@ func (predictor *Predictor) UserStr() string { sb.WriteString(fmt.Sprintf("%s: %s\n", TypeKey, predictor.Type)) sb.WriteString(fmt.Sprintf("%s: %s\n", PathKey, predictor.Path)) - if predictor.ModelPath != nil { - sb.WriteString(fmt.Sprintf("%s: %s\n", ModelPathKey, *predictor.ModelPath)) - } - if predictor.ModelPath == nil && predictor.Models != nil { + if predictor.Models != nil { sb.WriteString(fmt.Sprintf("%s:\n", ModelsKey)) sb.WriteString(s.Indent(predictor.Models.UserStr(), " ")) } - if predictor.SignatureKey != nil { - sb.WriteString(fmt.Sprintf("%s: %s\n", SignatureKeyKey, *predictor.SignatureKey)) + if predictor.MultiModelReloading != nil { + sb.WriteString(fmt.Sprintf("%s:\n", MultiModelReloadingKey)) + sb.WriteString(s.Indent(predictor.MultiModelReloading.UserStr(), " ")) } if predictor.Type == TensorFlowPredictorType && predictor.ServerSideBatching != nil { @@ -406,15 +406,17 @@ func (predictor *Predictor) UserStr() string { func (models *MultiModels) UserStr() string { var sb strings.Builder - if models.Dir != nil { - sb.WriteString(fmt.Sprintf("%s: %s\n", ModelsDirKey, *models.Dir)) - } else if len(models.Paths) > 0 { + if len(models.Paths) > 0 { sb.WriteString(fmt.Sprintf("%s:\n", ModelsPathsKey)) for _, model := range models.Paths { modelUserStr := s.Indent(model.UserStr(), " ") modelUserStr = modelUserStr[:2] + "-" + modelUserStr[3:] sb.WriteString(modelUserStr) } + } else if models.Path != nil { + sb.WriteString(fmt.Sprintf("%s: %s\n", ModelsPathKey, *models.Path)) + } else { + sb.WriteString(fmt.Sprintf("%s: %s\n", ModelsDirKey, *models.Dir)) } if models.SignatureKey != nil { sb.WriteString(fmt.Sprintf("%s: %s\n", ModelsDirKey, *models.SignatureKey)) @@ -431,9 +433,9 @@ func (models *MultiModels) UserStr() string { func (model *ModelResource) UserStr() string { var sb strings.Builder sb.WriteString(fmt.Sprintf("%s: %s\n", ModelsNameKey, model.Name)) - sb.WriteString(fmt.Sprintf("%s: %s\n", ModelPathKey, model.ModelPath)) + sb.WriteString(fmt.Sprintf("%s: %s\n", ModelsPathKey, model.Path)) if model.SignatureKey != nil { - sb.WriteString(fmt.Sprintf("%s: %s\n", SignatureKeyKey, *model.SignatureKey)) + sb.WriteString(fmt.Sprintf("%s: %s\n", ModelsSignatureKeyKey, *model.SignatureKey)) } return sb.String() } @@ -629,12 +631,6 @@ func (api *API) TelemetryEvent(provider types.ProviderType) map[string]interface event["predictor.processes_per_replica"] = api.Predictor.ProcessesPerReplica event["predictor.threads_per_process"] = api.Predictor.ThreadsPerProcess - if api.Predictor.ModelPath != nil { - event["predictor.model_path._is_defined"] = true - } - if api.Predictor.SignatureKey != nil { - event["predictor.signature_key._is_defined"] = true - } if api.Predictor.PythonPath != nil { event["predictor.python_path._is_defined"] = true } @@ -653,31 +649,42 @@ func (api *API) TelemetryEvent(provider types.ProviderType) map[string]interface event["predictor.env._len"] = len(api.Predictor.Env) } + var models *MultiModels if api.Predictor.Models != nil { + models = api.Predictor.Models + } + if api.Predictor.MultiModelReloading != nil { + models = api.Predictor.MultiModelReloading + } + + if models != nil { event["predictor.models._is_defined"] = true - if len(api.Predictor.Models.Paths) > 0 { + if models.Path != nil { + event["predictor.models.path._is_defined"] = true + } + if len(models.Paths) > 0 { event["predictor.models.paths._is_defined"] = true - event["predictor.models.paths._len"] = len(api.Predictor.Models.Paths) + event["predictor.models.paths._len"] = len(models.Paths) var numSignatureKeysDefined int - for _, mmPath := range api.Predictor.Models.Paths { + for _, mmPath := range models.Paths { if mmPath.SignatureKey != nil { numSignatureKeysDefined++ } } event["predictor.models.paths._num_signature_keys_defined"] = numSignatureKeysDefined } - if api.Predictor.Models.Dir != nil { + if models.Dir != nil { event["predictor.models.dir._is_defined"] = true } - if api.Predictor.Models.CacheSize != nil { + if models.CacheSize != nil { event["predictor.models.cache_size._is_defined"] = true - event["predictor.models.cache_size"] = *api.Predictor.Models.CacheSize + event["predictor.models.cache_size"] = *models.CacheSize } - if api.Predictor.Models.DiskCacheSize != nil { + if models.DiskCacheSize != nil { event["predictor.models.disk_cache_size._is_defined"] = true - event["predictor.models.disk_cache_size"] = *api.Predictor.Models.DiskCacheSize + event["predictor.models.disk_cache_size"] = *models.DiskCacheSize } - if api.Predictor.Models.SignatureKey != nil { + if models.SignatureKey != nil { event["predictor.models.signature_key._is_defined"] = true } } diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index b55fa9d5a1..9c17e5a1f5 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -34,9 +34,7 @@ const ( // Predictor TypeKey = "type" PathKey = "path" - ModelPathKey = "model_path" ServerSideBatchingKey = "server_side_batching" - ModelsKey = "models" PythonPathKey = "python_path" ImageKey = "image" TensorFlowServingImageKey = "tensorflow_serving_image" @@ -44,11 +42,16 @@ const ( ThreadsPerProcessKey = "threads_per_process" ConfigKey = "config" EnvKey = "env" - SignatureKeyKey = "signature_key" + + // MultiModelReloading + MultiModelReloadingKey = "multi_model_reloading" // MultiModels + ModelsKey = "models" + ModelsPathKey = "path" ModelsPathsKey = "paths" ModelsDirKey = "dir" + ModelsSignatureKeyKey = "signature_key" ModelsCacheSizeKey = "cache_size" ModelsDiskCacheSizeKey = "disk_cache_size" diff --git a/test/batch/onnx/cortex.yaml b/test/batch/onnx/cortex.yaml index 396ed52b15..e5dbcc32cd 100644 --- a/test/batch/onnx/cortex.yaml +++ b/test/batch/onnx/cortex.yaml @@ -3,6 +3,7 @@ predictor: type: onnx path: predictor.py - model_path: s3://cortex-examples/image-classifier/alexnet_batch/ + models: + path: s3://cortex-examples/image-classifier/alexnet_batch/ compute: cpu: 1 diff --git a/test/batch/tensorflow/cortex.yaml b/test/batch/tensorflow/cortex.yaml index 6a38215b1f..389f986708 100644 --- a/test/batch/tensorflow/cortex.yaml +++ b/test/batch/tensorflow/cortex.yaml @@ -3,6 +3,7 @@ predictor: type: tensorflow path: predictor.py - model_path: s3://cortex-examples/tensorflow/image-classifier/inception/ + models: + path: s3://cortex-examples/tensorflow/image-classifier/inception/ compute: cpu: 1 diff --git a/test/live-reloading/python/mpg-estimator/cortex.yaml b/test/live-reloading/python/mpg-estimator/cortex.yaml index 5c85d13841..74d21b3e83 100644 --- a/test/live-reloading/python/mpg-estimator/cortex.yaml +++ b/test/live-reloading/python/mpg-estimator/cortex.yaml @@ -3,4 +3,5 @@ predictor: type: python path: predictor.py - model_path: s3://cortex-examples/sklearn/mpg-estimator/linreg/ + multi_model_reloading: + path: s3://cortex-examples/sklearn/mpg-estimator/linreg/ diff --git a/test/model-caching/onnx/multi-model-classifier/cortex.yaml b/test/model-caching/onnx/multi-model-classifier/cortex.yaml index 1285a3e231..7970d24b5f 100644 --- a/test/model-caching/onnx/multi-model-classifier/cortex.yaml +++ b/test/model-caching/onnx/multi-model-classifier/cortex.yaml @@ -6,11 +6,11 @@ models: paths: - name: resnet50 - model_path: s3://cortex-examples/onnx/resnet50/ + path: s3://cortex-examples/onnx/resnet50/ - name: mobilenet - model_path: s3://cortex-examples/onnx/mobilenet/ + path: s3://cortex-examples/onnx/mobilenet/ - name: shufflenet - model_path: s3://cortex-examples/onnx/shufflenet/ + path: s3://cortex-examples/onnx/shufflenet/ cache_size: 2 disk_cache_size: 3 config: diff --git a/test/model-caching/python/mpg-estimator/README.md b/test/model-caching/python/mpg-estimator/README.md index da5d1bf7b8..9cd05e5200 100644 --- a/test/model-caching/python/mpg-estimator/README.md +++ b/test/model-caching/python/mpg-estimator/README.md @@ -27,7 +27,7 @@ export ENDPOINT=your-api-endpoint Make a request version `1` of the `mpg-estimator` model: ```bash -curl "${ENDPOINT}?model=resnet50&version=1" -X POST -H "Content-Type: application/json" -d @sample.json +curl "${ENDPOINT}?version=1" -X POST -H "Content-Type: application/json" -d @sample.json ``` The expected response is: @@ -43,13 +43,13 @@ At this point, there is one model loaded into memory (as specified by `cache_siz Make a request version `2` of the `mpg-estimator` model: ```bash -curl "${ENDPOINT}?model=mobilenet" -X POST -H "Content-Type: application/json" -d @sample.json +curl "${ENDPOINT}?version=2" -X POST -H "Content-Type: application/json" -d @sample.json ``` The expected response is: ```json -{"prediction": 26.929889872154185, "model": {"name": "mpg-estimator", "version": "1"}} +{"prediction": 26.929889872154185, "model": {"name": "mpg-estimator", "version": "2"}} ``` ### Version 3 @@ -59,13 +59,13 @@ With the following request, version 2 of the model will have to be evicted from Make a request version `3` of the `mpg-estimator` model: ```bash -curl "${ENDPOINT}?model=shufflenet" -X POST -H "Content-Type: application/json" -d @sample.json +curl "${ENDPOINT}?version=3" -X POST -H "Content-Type: application/json" -d @sample.json ``` The expected response is: ```json -{"prediction": 26.929889872154185, "model": {"name": "mpg-estimator", "version": "1"}} +{"prediction": 26.929889872154185, "model": {"name": "mpg-estimator", "version": "3"}} ``` --- diff --git a/test/model-caching/python/mpg-estimator/cortex.yaml b/test/model-caching/python/mpg-estimator/cortex.yaml index f5521cbc6a..3354f3c655 100644 --- a/test/model-caching/python/mpg-estimator/cortex.yaml +++ b/test/model-caching/python/mpg-estimator/cortex.yaml @@ -3,9 +3,9 @@ predictor: type: python path: predictor.py - models: + multi_model_reloading: paths: - name: mpg-estimator - model_path: s3://cortex-examples/sklearn/mpg-estimator/linreg/ + path: s3://cortex-examples/sklearn/mpg-estimator/linreg/ cache_size: 1 disk_cache_size: 2 diff --git a/test/model-caching/python/mpg-estimator/predictor.py b/test/model-caching/python/mpg-estimator/predictor.py index 309c65f649..8fcdb50a5d 100644 --- a/test/model-caching/python/mpg-estimator/predictor.py +++ b/test/model-caching/python/mpg-estimator/predictor.py @@ -10,7 +10,7 @@ def load_model(self, model_path): return mlflow.sklearn.load_model(model_path) def predict(self, payload, query_params): - model_name = query_params["model"] + model_name = "mpg-estimator" model_version = query_params.get("version", "latest") model = self.client.get_model(model_name, model_version) diff --git a/test/model-caching/tensorflow/multi-model-classifier/cortex.yaml b/test/model-caching/tensorflow/multi-model-classifier/cortex.yaml index 8d80ec181f..1587be7ff6 100644 --- a/test/model-caching/tensorflow/multi-model-classifier/cortex.yaml +++ b/test/model-caching/tensorflow/multi-model-classifier/cortex.yaml @@ -6,11 +6,11 @@ models: paths: - name: inception - model_path: s3://cortex-examples/tensorflow/image-classifier/inception/ + path: s3://cortex-examples/tensorflow/image-classifier/inception/ - name: iris - model_path: s3://cortex-examples/tensorflow/iris-classifier/nn/ + path: s3://cortex-examples/tensorflow/iris-classifier/nn/ - name: resnet50 - model_path: s3://cortex-examples/tensorflow/resnet50/ + path: s3://cortex-examples/tensorflow/resnet50/ cache_size: 2 disk_cache_size: 3 config: diff --git a/test/onnx/iris-classifier/cortex.yaml b/test/onnx/iris-classifier/cortex.yaml index d9a111ffbb..e08355b866 100644 --- a/test/onnx/iris-classifier/cortex.yaml +++ b/test/onnx/iris-classifier/cortex.yaml @@ -3,6 +3,7 @@ predictor: type: onnx path: predictor.py - model_path: s3://cortex-examples/onnx/iris-classifier/ + models: + path: s3://cortex-examples/onnx/iris-classifier/ monitoring: model_type: classification diff --git a/test/onnx/multi-model-classifier/cortex.yaml b/test/onnx/multi-model-classifier/cortex.yaml index 6487ac799a..d6fe4fcc72 100644 --- a/test/onnx/multi-model-classifier/cortex.yaml +++ b/test/onnx/multi-model-classifier/cortex.yaml @@ -6,11 +6,11 @@ models: paths: - name: resnet50 - model_path: s3://cortex-examples/onnx/resnet50/ + path: s3://cortex-examples/onnx/resnet50/ - name: mobilenet - model_path: s3://cortex-examples/onnx/mobilenet/ + path: s3://cortex-examples/onnx/mobilenet/ - name: shufflenet - model_path: s3://cortex-examples/onnx/shufflenet/ + path: s3://cortex-examples/onnx/shufflenet/ config: image-classifier-classes: https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json image-resize: 224 diff --git a/test/onnx/yolov5-youtube/cortex.yaml b/test/onnx/yolov5-youtube/cortex.yaml index e60cc6dc8d..fdd9f2dcc2 100644 --- a/test/onnx/yolov5-youtube/cortex.yaml +++ b/test/onnx/yolov5-youtube/cortex.yaml @@ -3,7 +3,8 @@ predictor: type: onnx path: predictor.py - model_path: s3://cortex-examples/onnx/yolov5-youtube/ + models: + path: s3://cortex-examples/onnx/yolov5-youtube/ config: iou_threshold: 0.5 confidence_threshold: 0.6 diff --git a/test/pytorch/image-classifier-resnet50/cortex_gpu.yaml b/test/pytorch/image-classifier-resnet50/cortex_gpu.yaml index 28b22f6169..816fc11644 100644 --- a/test/pytorch/image-classifier-resnet50/cortex_gpu.yaml +++ b/test/pytorch/image-classifier-resnet50/cortex_gpu.yaml @@ -4,7 +4,7 @@ type: python path: predictor.py config: - model_path: s3://cortex-examples/pytorch/image-classifier-resnet50 + path: s3://cortex-examples/pytorch/image-classifier-resnet50 model_name: resnet50.pt device: gpu classes: https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json diff --git a/test/tensorflow/image-classifier-inception/cortex.yaml b/test/tensorflow/image-classifier-inception/cortex.yaml index 3e5abbf2c0..a2d682073f 100644 --- a/test/tensorflow/image-classifier-inception/cortex.yaml +++ b/test/tensorflow/image-classifier-inception/cortex.yaml @@ -3,7 +3,8 @@ predictor: type: tensorflow path: predictor.py - model_path: s3://cortex-examples/tensorflow/image-classifier/inception/ + models: + path: s3://cortex-examples/tensorflow/image-classifier/inception/ monitoring: model_type: classification compute: diff --git a/test/tensorflow/image-classifier-inception/cortex_server_side_batching.yaml b/test/tensorflow/image-classifier-inception/cortex_server_side_batching.yaml index cc09dc4a67..745b6a9678 100644 --- a/test/tensorflow/image-classifier-inception/cortex_server_side_batching.yaml +++ b/test/tensorflow/image-classifier-inception/cortex_server_side_batching.yaml @@ -3,7 +3,8 @@ predictor: type: tensorflow path: predictor.py - model_path: s3://cortex-examples/tensorflow/image-classifier/inception/ + models: + path: s3://cortex-examples/tensorflow/image-classifier/inception/ server_side_batching: max_batch_size: 2 batch_interval: 0.2s diff --git a/test/tensorflow/image-classifier-resnet50/cortex.yaml b/test/tensorflow/image-classifier-resnet50/cortex.yaml index 63faee3057..d68cfd468a 100644 --- a/test/tensorflow/image-classifier-resnet50/cortex.yaml +++ b/test/tensorflow/image-classifier-resnet50/cortex.yaml @@ -3,7 +3,8 @@ predictor: type: tensorflow path: predictor.py - model_path: s3://cortex-examples/tensorflow/resnet50/ + models: + path: s3://cortex-examples/tensorflow/resnet50/ processes_per_replica: 4 threads_per_process: 16 config: diff --git a/test/tensorflow/image-classifier-resnet50/cortex_gpu.yaml b/test/tensorflow/image-classifier-resnet50/cortex_gpu.yaml index f0d20594e8..caf03e9c6c 100644 --- a/test/tensorflow/image-classifier-resnet50/cortex_gpu.yaml +++ b/test/tensorflow/image-classifier-resnet50/cortex_gpu.yaml @@ -3,7 +3,8 @@ predictor: type: tensorflow path: predictor.py - model_path: s3://cortex-examples/tensorflow/resnet50/ + models: + path: s3://cortex-examples/tensorflow/resnet50/ processes_per_replica: 4 threads_per_process: 24 config: diff --git a/test/tensorflow/image-classifier-resnet50/cortex_gpu_server_side_batching.yaml b/test/tensorflow/image-classifier-resnet50/cortex_gpu_server_side_batching.yaml index 0d1623bf75..824dcaf3df 100644 --- a/test/tensorflow/image-classifier-resnet50/cortex_gpu_server_side_batching.yaml +++ b/test/tensorflow/image-classifier-resnet50/cortex_gpu_server_side_batching.yaml @@ -3,7 +3,8 @@ predictor: type: tensorflow path: predictor.py - model_path: s3://cortex-examples/tensorflow/resnet50/ + models: + path: s3://cortex-examples/tensorflow/resnet50/ server_side_batching: max_batch_size: 32 batch_interval: 0.1s diff --git a/test/tensorflow/image-classifier-resnet50/cortex_inf.yaml b/test/tensorflow/image-classifier-resnet50/cortex_inf.yaml index 4e460f013c..70eab4288f 100644 --- a/test/tensorflow/image-classifier-resnet50/cortex_inf.yaml +++ b/test/tensorflow/image-classifier-resnet50/cortex_inf.yaml @@ -3,7 +3,8 @@ predictor: type: tensorflow path: predictor.py - model_path: s3://cortex-examples/tensorflow/resnet50_neuron/ + models: + path: s3://cortex-examples/tensorflow/resnet50_neuron/ processes_per_replica: 4 threads_per_process: 256 config: diff --git a/test/tensorflow/image-classifier-resnet50/cortex_inf_server_side_batching.yaml b/test/tensorflow/image-classifier-resnet50/cortex_inf_server_side_batching.yaml index 7a1279fcd4..7b40309b0d 100644 --- a/test/tensorflow/image-classifier-resnet50/cortex_inf_server_side_batching.yaml +++ b/test/tensorflow/image-classifier-resnet50/cortex_inf_server_side_batching.yaml @@ -3,7 +3,8 @@ predictor: type: tensorflow path: predictor.py - model_path: s3://cortex-examples/tensorflow/resnet50_neuron_batch_size_5/ + models: + path: s3://cortex-examples/tensorflow/resnet50_neuron_batch_size_5/ server_side_batching: max_batch_size: 5 batch_interval: 0.1s diff --git a/test/tensorflow/iris-classifier/cortex.yaml b/test/tensorflow/iris-classifier/cortex.yaml index cccd9761e8..df304caff0 100644 --- a/test/tensorflow/iris-classifier/cortex.yaml +++ b/test/tensorflow/iris-classifier/cortex.yaml @@ -3,6 +3,7 @@ predictor: type: tensorflow path: predictor.py - model_path: s3://cortex-examples/tensorflow/iris-classifier/nn/ + models: + path: s3://cortex-examples/tensorflow/iris-classifier/nn/ monitoring: model_type: classification diff --git a/test/tensorflow/license-plate-reader/cortex_full.yaml b/test/tensorflow/license-plate-reader/cortex_full.yaml index 1d4e0f11f5..228f6c13fa 100644 --- a/test/tensorflow/license-plate-reader/cortex_full.yaml +++ b/test/tensorflow/license-plate-reader/cortex_full.yaml @@ -3,7 +3,8 @@ predictor: type: tensorflow path: predictor_yolo.py - model_path: s3://cortex-examples/tensorflow/license-plate-reader/yolov3_tf/ + models: + path: s3://cortex-examples/tensorflow/license-plate-reader/yolov3_tf/ processes_per_replica: 4 threads_per_process: 3 signature_key: serving_default diff --git a/test/tensorflow/multi-model-classifier/cortex.yaml b/test/tensorflow/multi-model-classifier/cortex.yaml index 5aa9d4939c..105bb47378 100644 --- a/test/tensorflow/multi-model-classifier/cortex.yaml +++ b/test/tensorflow/multi-model-classifier/cortex.yaml @@ -6,11 +6,11 @@ models: paths: - name: inception - model_path: s3://cortex-examples/tensorflow/image-classifier/inception/ + path: s3://cortex-examples/tensorflow/image-classifier/inception/ - name: iris - model_path: s3://cortex-examples/tensorflow/iris-classifier/nn/ + path: s3://cortex-examples/tensorflow/iris-classifier/nn/ - name: resnet50 - model_path: s3://cortex-examples/tensorflow/resnet50/ + path: s3://cortex-examples/tensorflow/resnet50/ config: models: iris: diff --git a/test/tensorflow/sentiment-analyzer/cortex.yaml b/test/tensorflow/sentiment-analyzer/cortex.yaml index 66a2c3af9b..62ce78c8b5 100644 --- a/test/tensorflow/sentiment-analyzer/cortex.yaml +++ b/test/tensorflow/sentiment-analyzer/cortex.yaml @@ -3,7 +3,8 @@ predictor: type: tensorflow path: predictor.py - model_path: s3://cortex-examples/tensorflow/sentiment-analyzer/bert/ + models: + path: s3://cortex-examples/tensorflow/sentiment-analyzer/bert/ monitoring: model_type: classification compute: diff --git a/test/tensorflow/text-generator/cortex.yaml b/test/tensorflow/text-generator/cortex.yaml index d8d73621d7..e52e2ff446 100644 --- a/test/tensorflow/text-generator/cortex.yaml +++ b/test/tensorflow/text-generator/cortex.yaml @@ -3,7 +3,8 @@ predictor: type: tensorflow path: predictor.py - model_path: s3://cortex-examples/tensorflow/text-generator/gpt-2/124M/ + models: + path: s3://cortex-examples/tensorflow/text-generator/gpt-2/124M/ compute: cpu: 1 gpu: 1 diff --git a/test/traffic-splitter/cortex.yaml b/test/traffic-splitter/cortex.yaml index 3db335a56d..9f3b64f894 100644 --- a/test/traffic-splitter/cortex.yaml +++ b/test/traffic-splitter/cortex.yaml @@ -13,7 +13,8 @@ predictor: type: onnx path: onnx_predictor.py - model_path: s3://cortex-examples/onnx/iris-classifier/ + models: + path: s3://cortex-examples/onnx/iris-classifier/ monitoring: model_type: classification