diff --git a/cli/cmd/lib_realtime_apis.go b/cli/cmd/lib_realtime_apis.go index 1d335716d3..85b1d0df5a 100644 --- a/cli/cmd/lib_realtime_apis.go +++ b/cli/cmd/lib_realtime_apis.go @@ -21,7 +21,6 @@ import ( "fmt" "io/ioutil" "net/http" - "sort" "strconv" "strings" "time" @@ -53,15 +52,6 @@ func realtimeAPITable(realtimeAPI schema.APIResponse, env cliconfig.Environment) out += t.MustFormat() - if env.Provider != types.LocalProviderType && realtimeAPI.Spec.Monitoring != nil { - switch realtimeAPI.Spec.Monitoring.ModelType { - case userconfig.ClassificationModelType: - out += "\n" + classificationMetricsStr(realtimeAPI.Metrics) - case userconfig.RegressionModelType: - out += "\n" + regressionMetricsStr(realtimeAPI.Metrics) - } - } - if realtimeAPI.DashboardURL != nil && *realtimeAPI.DashboardURL != "" { out += "\n" + console.Bold("metrics dashboard: ") + *realtimeAPI.DashboardURL + "\n" } @@ -169,75 +159,6 @@ func code5XXStr(metrics *metrics.Metrics) string { return s.Int(metrics.NetworkStats.Code5XX) } -func regressionMetricsStr(metrics *metrics.Metrics) string { - minStr := "-" - maxStr := "-" - avgStr := "-" - - if metrics.RegressionStats != nil { - if metrics.RegressionStats.Min != nil { - minStr = fmt.Sprintf("%.9g", *metrics.RegressionStats.Min) - } - - if metrics.RegressionStats.Max != nil { - maxStr = fmt.Sprintf("%.9g", *metrics.RegressionStats.Max) - } - - if metrics.RegressionStats.Avg != nil { - avgStr = fmt.Sprintf("%.9g", *metrics.RegressionStats.Avg) - } - } - - t := table.Table{ - Headers: []table.Header{ - {Title: "min", MaxWidth: 10}, - {Title: "max", MaxWidth: 10}, - {Title: "avg", MaxWidth: 10}, - }, - Rows: [][]interface{}{{minStr, maxStr, avgStr}}, - } - - return t.MustFormat() -} - -func classificationMetricsStr(metrics *metrics.Metrics) string { - classList := make([]string, 0, len(metrics.ClassDistribution)) - for inputName := range metrics.ClassDistribution { - classList = append(classList, inputName) - } - sort.Strings(classList) - - rows := make([][]interface{}, len(classList)) - for rowNum, className := range classList { - rows[rowNum] = []interface{}{ - className, - metrics.ClassDistribution[className], - } - } - - if len(classList) == 0 { - rows = append(rows, []interface{}{ - "-", - "-", - }) - } - - t := table.Table{ - Headers: []table.Header{ - {Title: "class", MaxWidth: 40}, - {Title: "count", MaxWidth: 20}, - }, - Rows: rows, - } - - out := t.MustFormat() - - if len(classList) == consts.MaxClassesPerMonitoringRequest { - out += fmt.Sprintf("\nlisting at most %d classes, the complete list can be found in your cloudwatch dashboard\n", consts.MaxClassesPerMonitoringRequest) - } - return out -} - func describeModelInput(status *status.Status, predictor *userconfig.Predictor, apiEndpoint string) string { if status.Updated.Ready+status.Stale.Ready == 0 { return "the models' metadata schema will be available when the api is live\n" diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 94713e13d6..66af3e768a 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -50,10 +50,9 @@ var ( DefaultImageONNXPredictorGPU, ) - MaxClassesPerMonitoringRequest = 20 // cloudwatch.GeMetricData can get up to 100 metrics per request, avoid multiple requests and have room for other stats - DashboardTitle = "# cortex monitoring dashboard" - DefaultMaxReplicaConcurrency = int64(1024) - NeuronCoresPerInf = int64(4) + DashboardTitle = "# cortex monitoring dashboard" + DefaultMaxReplicaConcurrency = int64(1024) + NeuronCoresPerInf = int64(4) ) func defaultDockerImage(imageName string) string { diff --git a/pkg/cortex/serve/cortex_internal/lib/api/__init__.py b/pkg/cortex/serve/cortex_internal/lib/api/__init__.py index 4159536206..16cf6e6082 100644 --- a/pkg/cortex/serve/cortex_internal/lib/api/__init__.py +++ b/pkg/cortex/serve/cortex_internal/lib/api/__init__.py @@ -13,5 +13,4 @@ # limitations under the License. from cortex_internal.lib.api.predictor import Predictor -from cortex_internal.lib.api.monitoring import Monitoring from cortex_internal.lib.api.api import API, get_api, get_spec diff --git a/pkg/cortex/serve/cortex_internal/lib/api/api.py b/pkg/cortex/serve/cortex_internal/lib/api/api.py index 8333672103..77b8d62804 100644 --- a/pkg/cortex/serve/cortex_internal/lib/api/api.py +++ b/pkg/cortex/serve/cortex_internal/lib/api/api.py @@ -20,7 +20,7 @@ from typing import Any, Dict, Optional, Tuple, Union import datadog -from cortex_internal.lib.api import Monitoring, Predictor +from cortex_internal.lib.api import Predictor from cortex_internal.lib.exceptions import CortexException from cortex_internal.lib.storage import LocalStorage, S3, GCS from cortex_internal.lib.log import logger @@ -49,10 +49,6 @@ def __init__( self.name = api_spec["name"] self.predictor = Predictor(provider, api_spec, model_dir) - self.monitoring = None - if self.api_spec.get("monitoring") is not None: - self.monitoring = Monitoring(**self.api_spec["monitoring"]) - if provider != "local": host_ip = os.environ["HOST_IP"] datadog.initialize(statsd_host=host_ip, statsd_port="8125") @@ -65,24 +61,6 @@ def __init__( def server_side_batching_enabled(self): return self.api_spec["predictor"].get("server_side_batching") is not None - def get_cached_classes(self): - prefix = os.path.join(self.metadata_root, "classes") + "/" - class_paths, _ = self.storage.search(prefix=prefix) - class_set = set() - for class_path in class_paths: - encoded_class_name = class_path.split("/")[-1] - class_set.add(base64.urlsafe_b64decode(encoded_class_name.encode()).decode()) - return class_set - - def upload_class(self, class_name: str): - try: - ascii_encoded = class_name.encode("ascii") # cloudwatch only supports ascii - encoded_class_name = base64.urlsafe_b64encode(ascii_encoded) - key = os.path.join(self.metadata_root, "classes", encoded_class_name.decode()) - self.storage.put_json("", key) - except Exception as e: - raise ValueError("unable to store class {}".format(class_name)) from e - def metric_dimensions_with_id(self): return [ {"Name": "APIName", "Value": self.name}, @@ -106,14 +84,6 @@ def post_request_metrics(self, status_code, total_time): ] self.post_metrics(metrics) - def post_monitoring_metrics(self, prediction_value=None): - if prediction_value is not None: - metrics = [ - self.prediction_metrics(self.metric_dimensions(), prediction_value), - self.prediction_metrics(self.metric_dimensions_with_id(), prediction_value), - ] - self.post_metrics(metrics) - def post_metrics(self, metrics): try: if self.statsd is None: @@ -168,22 +138,6 @@ def latency_metric(self, dimensions, total_time): "Value": total_time, # milliseconds } - def prediction_metrics(self, dimensions, prediction_value): - if self.monitoring.model_type == "classification": - dimensions_with_class = dimensions + [{"Name": "Class", "Value": str(prediction_value)}] - return { - "MetricName": "Prediction", - "Dimensions": dimensions_with_class, - "Unit": "Count", - "Value": 1, - } - else: - return { - "MetricName": "Prediction", - "Dimensions": dimensions, - "Value": float(prediction_value), - } - def get_api( provider: str, diff --git a/pkg/cortex/serve/cortex_internal/lib/api/monitoring.py b/pkg/cortex/serve/cortex_internal/lib/api/monitoring.py deleted file mode 100644 index 9fafe3ab05..0000000000 --- a/pkg/cortex/serve/cortex_internal/lib/api/monitoring.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2020 Cortex Labs, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -class Monitoring: - def __init__(self, **kwargs): - self.key = kwargs.get("key") - self.model_type = kwargs["model_type"] - - def extract_predicted_value(self, prediction): - if self.key is not None: - if type(prediction) != dict: - raise ValueError( - "failed to extract key '{}' for monitoring: expected prediction response to be of type dict but found '{}'".format( - self.key, type(prediction) - ) - ) - if prediction.get(self.key) is None: - raise ValueError( - "failed to extract key '{}' for monitoring: key '{}' not found in prediction response".format( - self.key, self.key - ) - ) - predicted_value = prediction[self.key] - else: - predicted_value = prediction - - if self.model_type == "classification": - if type(predicted_value) != str and type(predicted_value) != int: - raise ValueError( - "failed to parse classification prediction for monitoring: expected type 'str' or 'int' but encountered '{}'".format( - type(predicted_value) - ) - ) - return str(predicted_value) - else: - if type(predicted_value) != float and type(predicted_value) != int: - raise ValueError( - "failed to parse regression prediction for monitoring: expected type 'float' or 'int' but encountered '{}'".format( - type(predicted_value) - ) - ) - return predicted_value diff --git a/pkg/cortex/serve/cortex_internal/serve/serve.py b/pkg/cortex/serve/cortex_internal/serve/serve.py index 0af2beffb8..e73ec0d181 100644 --- a/pkg/cortex/serve/cortex_internal/serve/serve.py +++ b/pkg/cortex/serve/cortex_internal/serve/serve.py @@ -36,7 +36,6 @@ from cortex_internal.lib.exceptions import UserRuntimeException from fastapi import FastAPI from fastapi.exceptions import RequestValidationError -from starlette.background import BackgroundTasks from starlette.exceptions import HTTPException as StarletteHTTPException from starlette.requests import Request from starlette.responses import JSONResponse, PlainTextResponse, Response @@ -62,7 +61,6 @@ "dynamic_batcher": None, "predict_route": None, "client": None, - "class_set": set(), } @@ -191,8 +189,6 @@ async def parse_payload(request: Request, call_next): def predict(request: Request): - tasks = BackgroundTasks() - api = local_cache["api"] predictor_impl = local_cache["predictor_impl"] dynamic_batcher = local_cache["dynamic_batcher"] kwargs = build_predict_kwargs(request) @@ -219,26 +215,10 @@ def predict(request: Request): ) from e response = Response(content=json_string, media_type="application/json") - if local_cache["provider"] not in ["local", "gcp"] and api.monitoring is not None: - try: - predicted_value = api.monitoring.extract_predicted_value(prediction) - api.post_monitoring_metrics(predicted_value) - if ( - api.monitoring.model_type == "classification" - and predicted_value not in local_cache["class_set"] - ): - tasks.add_task(api.upload_class, class_name=predicted_value) - local_cache["class_set"].add(predicted_value) - except: - logger.warn("unable to record prediction metric", exc_info=True) - if util.has_method(predictor_impl, "post_predict"): kwargs = build_post_predict_kwargs(prediction, request) request_thread_pool.submit(predictor_impl.post_predict, **kwargs) - if len(tasks.tasks) > 0: - response.background = tasks - return response @@ -355,16 +335,6 @@ def start_fn(): logger.exception("failed to start api") sys.exit(1) - if ( - provider != "local" - and api.monitoring is not None - and api.monitoring.model_type == "classification" - ): - try: - local_cache["class_set"] = api.get_cached_classes() - except: - logger.warn("an error occurred while attempting to load classes", exc_info=True) - app.add_api_route(local_cache["predict_route"], predict, methods=["POST"]) app.add_api_route(local_cache["predict_route"], get_summary, methods=["GET"]) diff --git a/pkg/operator/resources/realtimeapi/metrics.go b/pkg/operator/resources/realtimeapi/metrics.go index 15628f9681..6208d725ae 100644 --- a/pkg/operator/resources/realtimeapi/metrics.go +++ b/pkg/operator/resources/realtimeapi/metrics.go @@ -17,24 +17,18 @@ limitations under the License. package realtimeapi import ( - "encoding/base64" - "fmt" - "path/filepath" "strings" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/cloudwatch" - "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/parallel" - "github.com/cortexlabs/cortex/pkg/lib/pointer" "github.com/cortexlabs/cortex/pkg/lib/slices" "github.com/cortexlabs/cortex/pkg/operator/config" "github.com/cortexlabs/cortex/pkg/types" "github.com/cortexlabs/cortex/pkg/types/metrics" "github.com/cortexlabs/cortex/pkg/types/spec" - "github.com/cortexlabs/cortex/pkg/types/userconfig" ) func GetMultipleMetrics(apis []spec.API) ([]metrics.Metrics, error) { @@ -107,17 +101,6 @@ func getMetricsFunc(api *spec.API, period int64, startTime *time.Time, endTime * } metrics.NetworkStats = networkStats - if api.Monitoring != nil { - if api.Monitoring.ModelType == userconfig.ClassificationModelType { - metrics.ClassDistribution = extractClassificationMetrics(metricDataResults) - } else { - regressionStats, err := extractRegressionMetrics(metricDataResults) - if err != nil { - return err - } - metrics.RegressionStats = regressionStats - } - } return nil } } @@ -125,19 +108,6 @@ func getMetricsFunc(api *spec.API, period int64, startTime *time.Time, endTime * func queryMetrics(api *spec.API, period int64, startTime *time.Time, endTime *time.Time) ([]*cloudwatch.MetricDataResult, error) { allMetrics := getNetworkStatsDef(api, period) - if api.Monitoring != nil { - if api.Monitoring.ModelType == userconfig.ClassificationModelType { - classMetrics, err := getClassesMetricDef(api, period) - if err != nil { - return nil, err - } - allMetrics = append(allMetrics, classMetrics...) - } else { - regressionMetrics := getRegressionMetricDef(api, period) - allMetrics = append(allMetrics, regressionMetrics...) - } - } - metricsDataQuery := cloudwatch.GetMetricDataInput{ EndTime: endTime, StartTime: startTime, @@ -199,38 +169,6 @@ func extractClassificationMetrics(metricsDataResults []*cloudwatch.MetricDataRes return classDistribution } -func extractRegressionMetrics(metricsDataResults []*cloudwatch.MetricDataResult) (*metrics.RegressionStats, error) { - var regressionStats metrics.RegressionStats - var predictionAvgs []*float64 - var requestCounts []*float64 - - for _, metricData := range metricsDataResults { - if metricData.Values == nil { - continue - } - - switch { - case *metricData.Label == "Min": - regressionStats.Min = slices.Float64PtrMin(metricData.Values...) - case *metricData.Label == "Max": - regressionStats.Max = slices.Float64PtrMax(metricData.Values...) - case *metricData.Label == "SampleCount": - regressionStats.SampleCount = slices.Float64PtrSumInt(metricData.Values...) - requestCounts = metricData.Values - case *metricData.Label == "Avg": - predictionAvgs = metricData.Values - } - } - - avg, err := slices.Float64PtrAvg(predictionAvgs, requestCounts) - if err != nil { - return nil, err - } - regressionStats.Avg = avg - - return ®ressionStats, nil -} - func getAPIDimensions(api *spec.API) []*cloudwatch.Dimension { return []*cloudwatch.Dimension{ { @@ -268,55 +206,6 @@ func getAPIDimensionsHistogram(api *spec.API) []*cloudwatch.Dimension { ) } -func getRegressionMetricDef(api *spec.API, period int64) []*cloudwatch.MetricDataQuery { - metric := &cloudwatch.Metric{ - Namespace: aws.String(config.Cluster.ClusterName), - MetricName: aws.String("Prediction"), - Dimensions: getAPIDimensionsHistogram(api), - } - - regressionMetric := []*cloudwatch.MetricDataQuery{ - { - Id: aws.String("min"), - Label: aws.String("Min"), - MetricStat: &cloudwatch.MetricStat{ - Metric: metric, - Stat: aws.String("Minimum"), - Period: aws.Int64(period), - }, - }, - { - Id: aws.String("max"), - Label: aws.String("Max"), - MetricStat: &cloudwatch.MetricStat{ - Metric: metric, - Stat: aws.String("Maximum"), - Period: aws.Int64(period), - }, - }, - { - Id: aws.String("sample_count"), - Label: aws.String("SampleCount"), - MetricStat: &cloudwatch.MetricStat{ - Metric: metric, - Stat: aws.String("SampleCount"), - Period: aws.Int64(period), - }, - }, - { - Id: aws.String("avg"), - Label: aws.String("Avg"), - MetricStat: &cloudwatch.MetricStat{ - Metric: metric, - Stat: aws.String("Average"), - Period: aws.Int64(period), - }, - }, - } - - return regressionMetric -} - func getNetworkStatsDef(api *spec.API, period int64) []*cloudwatch.MetricDataQuery { statusCodes := []string{"2XX", "4XX", "5XX"} networkDataQueries := make([]*cloudwatch.MetricDataQuery, len(statusCodes)+2) @@ -371,50 +260,3 @@ func getNetworkStatsDef(api *spec.API, period int64) []*cloudwatch.MetricDataQue } return networkDataQueries } - -func getClassesMetricDef(api *spec.API, period int64) ([]*cloudwatch.MetricDataQuery, error) { - prefix := filepath.Join(api.MetadataRoot, "classes") + "/" - classes, err := config.AWS.ListS3Prefix(config.Cluster.Bucket, prefix, false, pointer.Int64(int64(consts.MaxClassesPerMonitoringRequest))) - if err != nil { - return nil, err - } - - if len(classes) == 0 { - return nil, nil - } - - classMetricQueries := []*cloudwatch.MetricDataQuery{} - - for i, classObj := range classes { - classKey := *classObj.Key - urlSplit := strings.Split(classKey, "/") - encodedClassName := urlSplit[len(urlSplit)-1] - decodedBytes, err := base64.URLEncoding.DecodeString(encodedClassName) - if err != nil { - return nil, errors.Wrap(err, "encoded class name", encodedClassName) - } - - className := string(decodedBytes) - if len(className) == 0 { - continue - } - - classMetricQueries = append(classMetricQueries, &cloudwatch.MetricDataQuery{ - Id: aws.String(fmt.Sprintf("id_%d", i)), - MetricStat: &cloudwatch.MetricStat{ - Metric: &cloudwatch.Metric{ - Namespace: aws.String(config.Cluster.ClusterName), - MetricName: aws.String("Prediction"), - Dimensions: append(getAPIDimensionsCounter(api), &cloudwatch.Dimension{ - Name: aws.String("Class"), - Value: aws.String(className), - }), - }, - Stat: aws.String("Sum"), - Period: aws.Int64(period), - }, - Label: aws.String("class_" + className), - }) - } - return classMetricQueries, nil -} diff --git a/pkg/types/metrics/metrics.go b/pkg/types/metrics/metrics.go index b7b074b685..abef3aaae3 100644 --- a/pkg/types/metrics/metrics.go +++ b/pkg/types/metrics/metrics.go @@ -22,40 +22,11 @@ import ( ) type Metrics struct { - APIName string `json:"api_name"` - NetworkStats *NetworkStats `json:"network_stats"` - ClassDistribution map[string]int `json:"class_distribution"` - RegressionStats *RegressionStats `json:"regression_stats"` -} - -type NetworkStats struct { - Latency *float64 `json:"latency"` - Code2XX int `json:"code_2xx"` - Code4XX int `json:"code_4xx"` - Code5XX int `json:"code_5xx"` - Total int `json:"total"` -} - -type RegressionStats struct { - Min *float64 `json:"min"` - Max *float64 `json:"max"` - Avg *float64 `json:"avg"` - SampleCount int `json:"sample_count"` + APIName string `json:"api_name"` + NetworkStats *NetworkStats `json:"network_stats"` } func (left Metrics) Merge(right Metrics) Metrics { - mergedClassDistribution := left.ClassDistribution - - if right.ClassDistribution != nil { - if left.ClassDistribution == nil { - mergedClassDistribution = right.ClassDistribution - } else { - for className, count := range right.ClassDistribution { - mergedClassDistribution[className] += count - } - } - } - var mergedNetworkStats *NetworkStats switch { case left.NetworkStats != nil && right.NetworkStats != nil: @@ -67,24 +38,19 @@ func (left Metrics) Merge(right Metrics) Metrics { mergedNetworkStats = right.NetworkStats } - var mergedRegressionStats *RegressionStats - switch { - case left.RegressionStats != nil && right.RegressionStats != nil: - merged := (*left.RegressionStats).Merge(*right.RegressionStats) - mergedRegressionStats = &merged - case left.RegressionStats != nil: - mergedRegressionStats = left.RegressionStats - case right.RegressionStats != nil: - mergedRegressionStats = right.RegressionStats - } - return Metrics{ - NetworkStats: mergedNetworkStats, - RegressionStats: mergedRegressionStats, - ClassDistribution: mergedClassDistribution, + NetworkStats: mergedNetworkStats, } } +type NetworkStats struct { + Latency *float64 `json:"latency"` + Code2XX int `json:"code_2xx"` + Code4XX int `json:"code_4xx"` + Code5XX int `json:"code_5xx"` + Total int `json:"total"` +} + func (left NetworkStats) Merge(right NetworkStats) NetworkStats { return NetworkStats{ Latency: mergeAvg(left.Latency, left.Total, right.Latency, right.Total), @@ -95,17 +61,6 @@ func (left NetworkStats) Merge(right NetworkStats) NetworkStats { } } -func (left RegressionStats) Merge(right RegressionStats) RegressionStats { - totalSampleCount := left.SampleCount + right.SampleCount - - return RegressionStats{ - Min: slices.Float64PtrMin(left.Min, right.Min), - Max: slices.Float64PtrMax(left.Max, right.Max), - Avg: mergeAvg(left.Avg, left.SampleCount, right.Avg, right.SampleCount), - SampleCount: totalSampleCount, - } -} - func mergeAvg(left *float64, leftCount int, right *float64, rightCount int) *float64 { leftCountFloat64Ptr := pointer.Float64(float64(leftCount)) rightCountFloat64Ptr := pointer.Float64(float64(rightCount)) diff --git a/pkg/types/metrics/metrics_test.go b/pkg/types/metrics/metrics_test.go index 54c2303eff..6924592cdc 100644 --- a/pkg/types/metrics/metrics_test.go +++ b/pkg/types/metrics/metrics_test.go @@ -42,43 +42,6 @@ func TestMergeAvg(t *testing.T) { require.Equal(t, float64(1.25), *mergeAvg(pointer.Float64(2), 1, pointer.Float64(1), 3)) } -func TestRegressionStatsMerge(t *testing.T) { - require.Equal(t, RegressionStats{}, RegressionStats{}.Merge(RegressionStats{})) - require.Equal(t, RegressionStats{Min: pointer.Float64(1)}, RegressionStats{Min: pointer.Float64(1)}.Merge(RegressionStats{})) - require.Equal(t, RegressionStats{Min: pointer.Float64(1)}, RegressionStats{}.Merge(RegressionStats{Min: pointer.Float64(1)})) - require.Equal(t, RegressionStats{Min: pointer.Float64(1)}, RegressionStats{Min: pointer.Float64(2)}.Merge(RegressionStats{Min: pointer.Float64(1)})) - require.Equal(t, RegressionStats{Min: pointer.Float64(1)}, RegressionStats{Min: pointer.Float64(1)}.Merge(RegressionStats{Min: pointer.Float64(2)})) - - require.Equal(t, RegressionStats{Max: pointer.Float64(1)}, RegressionStats{Max: pointer.Float64(1)}.Merge(RegressionStats{})) - require.Equal(t, RegressionStats{Max: pointer.Float64(1)}, RegressionStats{}.Merge(RegressionStats{Max: pointer.Float64(1)})) - require.Equal(t, RegressionStats{Max: pointer.Float64(2)}, RegressionStats{Max: pointer.Float64(2)}.Merge(RegressionStats{Max: pointer.Float64(1)})) - require.Equal(t, RegressionStats{Max: pointer.Float64(2)}, RegressionStats{Max: pointer.Float64(1)}.Merge(RegressionStats{Max: pointer.Float64(2)})) - - left := RegressionStats{ - Max: pointer.Float64(5), - Min: pointer.Float64(2), - Avg: pointer.Float64(3.5), - SampleCount: 4, - } - - right := RegressionStats{ - Max: pointer.Float64(6), - Min: pointer.Float64(1), - Avg: pointer.Float64(3.5), - SampleCount: 6, - } - - merged := RegressionStats{ - Max: pointer.Float64(6), - Min: pointer.Float64(1), - Avg: pointer.Float64(3.5), - SampleCount: 10, - } - - require.Equal(t, merged, left.Merge(right)) - require.Equal(t, merged, right.Merge(left)) -} - func TestNetworkStatsMerge(t *testing.T) { require.Equal(t, NetworkStats{}, NetworkStats{}.Merge(NetworkStats{})) @@ -113,12 +76,6 @@ func TestNetworkStatsMerge(t *testing.T) { func TestAPIMetricsMerge(t *testing.T) { require.Equal(t, Metrics{}, Metrics{}.Merge(Metrics{})) - classDistribution := map[string]int{ - "class_a": 1, - "class_b": 2, - "class_c": 4, - } - networkStats := NetworkStats{ Code2XX: 3, Code4XX: 4, @@ -127,35 +84,14 @@ func TestAPIMetricsMerge(t *testing.T) { Total: 12, } - regressionStats := RegressionStats{ - Max: pointer.Float64(6), - Min: pointer.Float64(1), - Avg: pointer.Float64(3.5), - SampleCount: 6, - } - mergedNetworkStats := networkStats.Merge(networkStats) - mergedRegressionStats := regressionStats.Merge(regressionStats) - - mergedClassDistribution := map[string]int{ - "class_a": 2, - "class_b": 4, - "class_c": 8, - } - - require.Equal(t, Metrics{ClassDistribution: classDistribution}, Metrics{ClassDistribution: classDistribution}.Merge(Metrics{})) - require.Equal(t, Metrics{ClassDistribution: classDistribution}, Metrics{}.Merge(Metrics{ClassDistribution: classDistribution})) mergedAPIMetrics := Metrics{ - ClassDistribution: mergedClassDistribution, - NetworkStats: &mergedNetworkStats, - RegressionStats: &mergedRegressionStats, + NetworkStats: &mergedNetworkStats, } apiMetrics := Metrics{ - ClassDistribution: classDistribution, - NetworkStats: &networkStats, - RegressionStats: ®ressionStats, + NetworkStats: &networkStats, } require.Equal(t, mergedAPIMetrics, apiMetrics.Merge(apiMetrics)) diff --git a/pkg/types/spec/api.go b/pkg/types/spec/api.go index f34da5ccc4..ee7d028edb 100644 --- a/pkg/types/spec/api.go +++ b/pkg/types/spec/api.go @@ -70,7 +70,6 @@ APIID (uniquely identifies an api configuration for a given deployment) * PredictorID (used to determine when rolling updates need to happen) * Resource * Predictor - * Monitoring * Compute * ProjectID * Deployment Strategy @@ -84,7 +83,6 @@ func GetAPISpec(apiConfig *userconfig.API, projectID string, deploymentID string buf.WriteString(s.Obj(apiConfig.Resource)) buf.WriteString(s.Obj(apiConfig.Predictor)) - buf.WriteString(s.Obj(apiConfig.Monitoring)) buf.WriteString(projectID) if apiConfig.Compute != nil { buf.WriteString(s.Obj(apiConfig.Compute.Normalized())) diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 94d923edea..61872057f0 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -65,7 +65,6 @@ func apiValidation( predictorValidation(), networkingValidation(resource.Kind, provider, awsClusterConfig, gcpClusterConfig), computeValidation(provider), - monitoringValidation(provider), autoscalingValidation(provider), updateStrategyValidation(provider), ) @@ -239,42 +238,6 @@ func predictorValidation() *cr.StructFieldValidation { } } -func monitoringValidation(provider types.ProviderType) *cr.StructFieldValidation { - if provider != types.AWSProviderType && provider != types.LocalProviderType { - return &cr.StructFieldValidation{ - StructField: "Monitoring", - StructValidation: &cr.StructValidation{ - CantBeSpecifiedErrStr: pointer.String("only supported on AWS clusters"), - }, - } - } - - return &cr.StructFieldValidation{ - StructField: "Monitoring", - StructValidation: &cr.StructValidation{ - DefaultNil: true, - AllowExplicitNull: true, - StructFieldValidations: []*cr.StructFieldValidation{ - { - StructField: "Key", - StringPtrValidation: &cr.StringPtrValidation{}, - }, - { - StructField: "ModelType", - StringValidation: &cr.StringValidation{ - Required: false, - AllowEmpty: true, - AllowedValues: userconfig.ModelTypeStrings(), - }, - Parser: func(str string) (interface{}, error) { - return userconfig.ModelTypeFromString(str), nil - }, - }, - }, - }, - } -} - func networkingValidation( kind userconfig.Kind, provider types.ProviderType, diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index adaef3859b..613a8796e5 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -34,7 +34,6 @@ type API struct { Resource APIs []*TrafficSplit `json:"apis" yaml:"apis"` Predictor *Predictor `json:"predictor" yaml:"predictor"` - Monitoring *Monitoring `json:"monitoring" yaml:"monitoring"` Networking *Networking `json:"networking" yaml:"networking"` Compute *Compute `json:"compute" yaml:"compute"` Autoscaling *Autoscaling `json:"autoscaling" yaml:"autoscaling"` @@ -82,11 +81,6 @@ type ModelResource struct { SignatureKey *string `json:"signature_key" yaml:"signature_key"` } -type Monitoring struct { - Key *string `json:"key" yaml:"key"` - ModelType ModelType `json:"model_type" yaml:"model_type"` -} - type ServerSideBatching struct { MaxBatchSize int32 `json:"max_batch_size" yaml:"max_batch_size"` BatchInterval time.Duration `json:"batch_interval" yaml:"batch_interval"` @@ -342,13 +336,6 @@ func (api *API) UserStr(provider types.ProviderType) string { sb.WriteString(fmt.Sprintf("%s:\n", UpdateStrategyKey)) sb.WriteString(s.Indent(api.UpdateStrategy.UserStr(), " ")) } - - if provider == types.AWSProviderType { - if api.Monitoring != nil { - sb.WriteString(fmt.Sprintf("%s:\n", MonitoringKey)) - sb.WriteString(s.Indent(api.Monitoring.UserStr(), " ")) - } - } } return sb.String() } @@ -450,15 +437,6 @@ func (batch *ServerSideBatching) UserStr() string { return sb.String() } -func (monitoring *Monitoring) UserStr() string { - var sb strings.Builder - sb.WriteString(fmt.Sprintf("%s: %s\n", ModelTypeKey, monitoring.ModelType.String())) - if monitoring.Key != nil { - sb.WriteString(fmt.Sprintf("%s: %s\n", KeyKey, *monitoring.Key)) - } - return sb.String() -} - func (networking *Networking) UserStr(provider types.ProviderType) string { var sb strings.Builder if provider == types.LocalProviderType && networking.LocalPort != nil { @@ -591,14 +569,6 @@ func (api *API) TelemetryEvent(provider types.ProviderType) map[string]interface event["apis._len"] = len(api.APIs) } - if api.Monitoring != nil { - event["monitoring._is_defined"] = true - event["monitoring.model_type"] = api.Monitoring.ModelType - if api.Monitoring.Key != nil { - event["monitoring.key._is_defined"] = true - } - } - if api.Networking != nil { event["networking._is_defined"] = true event["networking.api_gateway"] = api.Networking.APIGateway diff --git a/pkg/types/userconfig/config_key.go b/pkg/types/userconfig/config_key.go index 1b36325511..f42334431d 100644 --- a/pkg/types/userconfig/config_key.go +++ b/pkg/types/userconfig/config_key.go @@ -21,7 +21,6 @@ const ( NameKey = "name" KindKey = "kind" PredictorKey = "predictor" - MonitoringKey = "monitoring" NetworkingKey = "networking" ComputeKey = "compute" AutoscalingKey = "autoscaling" @@ -63,10 +62,6 @@ const ( // ModelResource ModelsNameKey = "name" - // Monitoring - KeyKey = "key" - ModelTypeKey = "model_type" - // Networking APIGatewayKey = "api_gateway" EndpointKey = "endpoint" diff --git a/pkg/types/userconfig/model_type.go b/pkg/types/userconfig/model_type.go deleted file mode 100644 index 8ce21a552a..0000000000 --- a/pkg/types/userconfig/model_type.go +++ /dev/null @@ -1,78 +0,0 @@ -/* -Copyright 2020 Cortex Labs, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package userconfig - -type ModelType int - -const ( - UnknownModelType ModelType = iota - ClassificationModelType - RegressionModelType -) - -var _modelTypes = []string{ - "unknown", - "classification", - "regression", -} - -func ModelTypeFromString(s string) ModelType { - for i := 0; i < len(_modelTypes); i++ { - if s == _modelTypes[i] { - return ModelType(i) - } - } - return UnknownModelType -} - -func ModelTypeStrings() []string { - return _modelTypes[1:] -} - -func (t ModelType) String() string { - return _modelTypes[t] -} - -// MarshalText satisfies TextMarshaler -func (t ModelType) MarshalText() ([]byte, error) { - return []byte(t.String()), nil -} - -// UnmarshalText satisfies TextUnmarshaler -func (t *ModelType) UnmarshalText(text []byte) error { - enum := string(text) - for i := 0; i < len(_modelTypes); i++ { - if enum == _modelTypes[i] { - *t = ModelType(i) - return nil - } - } - - *t = UnknownModelType - return nil -} - -// UnmarshalBinary satisfies BinaryUnmarshaler -// Needed for msgpack -func (t *ModelType) UnmarshalBinary(data []byte) error { - return t.UnmarshalText(data) -} - -// MarshalBinary satisfies BinaryMarshaler -func (t ModelType) MarshalBinary() ([]byte, error) { - return []byte(t.String()), nil -} diff --git a/test/apis/onnx/iris-classifier/cortex.yaml b/test/apis/onnx/iris-classifier/cortex.yaml index e08355b866..4ba3cb0d9b 100644 --- a/test/apis/onnx/iris-classifier/cortex.yaml +++ b/test/apis/onnx/iris-classifier/cortex.yaml @@ -4,6 +4,4 @@ type: onnx path: predictor.py models: - path: s3://cortex-examples/onnx/iris-classifier/ - monitoring: - model_type: classification + path: s3://cortex-examples/onnx/iris-classifier/ diff --git a/test/apis/pytorch/iris-classifier/cortex.yaml b/test/apis/pytorch/iris-classifier/cortex.yaml index 57bf052d1c..2c214c3aac 100644 --- a/test/apis/pytorch/iris-classifier/cortex.yaml +++ b/test/apis/pytorch/iris-classifier/cortex.yaml @@ -5,5 +5,3 @@ path: predictor.py config: model: s3://cortex-examples/pytorch/iris-classifier/weights.pth - monitoring: - model_type: classification diff --git a/test/apis/pytorch/language-identifier/cortex.yaml b/test/apis/pytorch/language-identifier/cortex.yaml index 1a157df06f..79d1308ace 100644 --- a/test/apis/pytorch/language-identifier/cortex.yaml +++ b/test/apis/pytorch/language-identifier/cortex.yaml @@ -3,5 +3,3 @@ predictor: type: python path: predictor.py - monitoring: - model_type: classification diff --git a/test/apis/sklearn/iris-classifier/cortex.yaml b/test/apis/sklearn/iris-classifier/cortex.yaml index 1b0c73f685..2688c06649 100644 --- a/test/apis/sklearn/iris-classifier/cortex.yaml +++ b/test/apis/sklearn/iris-classifier/cortex.yaml @@ -6,8 +6,6 @@ config: bucket: cortex-examples key: sklearn/iris-classifier/model.pkl - monitoring: - model_type: classification compute: cpu: 0.2 mem: 200M diff --git a/test/apis/sklearn/mpg-estimator/cortex.yaml b/test/apis/sklearn/mpg-estimator/cortex.yaml index c389ded046..fffdba05a7 100644 --- a/test/apis/sklearn/mpg-estimator/cortex.yaml +++ b/test/apis/sklearn/mpg-estimator/cortex.yaml @@ -5,5 +5,3 @@ path: predictor.py config: model: s3://cortex-examples/sklearn/mpg-estimator/linreg/ - monitoring: - model_type: regression diff --git a/test/apis/tensorflow/image-classifier-inception/cortex.yaml b/test/apis/tensorflow/image-classifier-inception/cortex.yaml index a2d682073f..84daa64b0c 100644 --- a/test/apis/tensorflow/image-classifier-inception/cortex.yaml +++ b/test/apis/tensorflow/image-classifier-inception/cortex.yaml @@ -5,8 +5,6 @@ path: predictor.py models: path: s3://cortex-examples/tensorflow/image-classifier/inception/ - monitoring: - model_type: classification compute: cpu: 1 gpu: 1 diff --git a/test/apis/tensorflow/image-classifier-inception/cortex_server_side_batching.yaml b/test/apis/tensorflow/image-classifier-inception/cortex_server_side_batching.yaml index 745b6a9678..ba28a738a2 100644 --- a/test/apis/tensorflow/image-classifier-inception/cortex_server_side_batching.yaml +++ b/test/apis/tensorflow/image-classifier-inception/cortex_server_side_batching.yaml @@ -9,8 +9,6 @@ max_batch_size: 2 batch_interval: 0.2s threads_per_process: 2 - monitoring: - model_type: classification compute: cpu: 1 gpu: 1 diff --git a/test/apis/tensorflow/iris-classifier/cortex.yaml b/test/apis/tensorflow/iris-classifier/cortex.yaml index df304caff0..96d7c4483b 100644 --- a/test/apis/tensorflow/iris-classifier/cortex.yaml +++ b/test/apis/tensorflow/iris-classifier/cortex.yaml @@ -5,5 +5,3 @@ path: predictor.py models: path: s3://cortex-examples/tensorflow/iris-classifier/nn/ - monitoring: - model_type: classification diff --git a/test/apis/tensorflow/sentiment-analyzer/cortex.yaml b/test/apis/tensorflow/sentiment-analyzer/cortex.yaml index 62ce78c8b5..012deb33da 100644 --- a/test/apis/tensorflow/sentiment-analyzer/cortex.yaml +++ b/test/apis/tensorflow/sentiment-analyzer/cortex.yaml @@ -5,8 +5,6 @@ path: predictor.py models: path: s3://cortex-examples/tensorflow/sentiment-analyzer/bert/ - monitoring: - model_type: classification compute: cpu: 1 gpu: 1 diff --git a/test/apis/traffic-splitter/cortex.yaml b/test/apis/traffic-splitter/cortex.yaml index 9f3b64f894..d11ecadde5 100644 --- a/test/apis/traffic-splitter/cortex.yaml +++ b/test/apis/traffic-splitter/cortex.yaml @@ -5,8 +5,6 @@ path: pytorch_predictor.py config: model: s3://cortex-examples/pytorch/iris-classifier/weights.pth - monitoring: - model_type: classification - name: iris-classifier-onnx kind: RealtimeAPI @@ -15,8 +13,6 @@ path: onnx_predictor.py models: path: s3://cortex-examples/onnx/iris-classifier/ - monitoring: - model_type: classification - name: iris-classifier kind: TrafficSplitter