Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 0 additions & 79 deletions cli/cmd/lib_realtime_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"io/ioutil"
"net/http"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -53,15 +52,6 @@ func realtimeAPITable(realtimeAPI schema.APIResponse, env cliconfig.Environment)

out += t.MustFormat()

if env.Provider != types.LocalProviderType && realtimeAPI.Spec.Monitoring != nil {
switch realtimeAPI.Spec.Monitoring.ModelType {
case userconfig.ClassificationModelType:
out += "\n" + classificationMetricsStr(realtimeAPI.Metrics)
case userconfig.RegressionModelType:
out += "\n" + regressionMetricsStr(realtimeAPI.Metrics)
}
}

if realtimeAPI.DashboardURL != nil && *realtimeAPI.DashboardURL != "" {
out += "\n" + console.Bold("metrics dashboard: ") + *realtimeAPI.DashboardURL + "\n"
}
Expand Down Expand Up @@ -169,75 +159,6 @@ func code5XXStr(metrics *metrics.Metrics) string {
return s.Int(metrics.NetworkStats.Code5XX)
}

func regressionMetricsStr(metrics *metrics.Metrics) string {
minStr := "-"
maxStr := "-"
avgStr := "-"

if metrics.RegressionStats != nil {
if metrics.RegressionStats.Min != nil {
minStr = fmt.Sprintf("%.9g", *metrics.RegressionStats.Min)
}

if metrics.RegressionStats.Max != nil {
maxStr = fmt.Sprintf("%.9g", *metrics.RegressionStats.Max)
}

if metrics.RegressionStats.Avg != nil {
avgStr = fmt.Sprintf("%.9g", *metrics.RegressionStats.Avg)
}
}

t := table.Table{
Headers: []table.Header{
{Title: "min", MaxWidth: 10},
{Title: "max", MaxWidth: 10},
{Title: "avg", MaxWidth: 10},
},
Rows: [][]interface{}{{minStr, maxStr, avgStr}},
}

return t.MustFormat()
}

func classificationMetricsStr(metrics *metrics.Metrics) string {
classList := make([]string, 0, len(metrics.ClassDistribution))
for inputName := range metrics.ClassDistribution {
classList = append(classList, inputName)
}
sort.Strings(classList)

rows := make([][]interface{}, len(classList))
for rowNum, className := range classList {
rows[rowNum] = []interface{}{
className,
metrics.ClassDistribution[className],
}
}

if len(classList) == 0 {
rows = append(rows, []interface{}{
"-",
"-",
})
}

t := table.Table{
Headers: []table.Header{
{Title: "class", MaxWidth: 40},
{Title: "count", MaxWidth: 20},
},
Rows: rows,
}

out := t.MustFormat()

if len(classList) == consts.MaxClassesPerMonitoringRequest {
out += fmt.Sprintf("\nlisting at most %d classes, the complete list can be found in your cloudwatch dashboard\n", consts.MaxClassesPerMonitoringRequest)
}
return out
}

func describeModelInput(status *status.Status, predictor *userconfig.Predictor, apiEndpoint string) string {
if status.Updated.Ready+status.Stale.Ready == 0 {
return "the models' metadata schema will be available when the api is live\n"
Expand Down
7 changes: 3 additions & 4 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ var (
DefaultImageONNXPredictorGPU,
)

MaxClassesPerMonitoringRequest = 20 // cloudwatch.GeMetricData can get up to 100 metrics per request, avoid multiple requests and have room for other stats
DashboardTitle = "# cortex monitoring dashboard"
DefaultMaxReplicaConcurrency = int64(1024)
NeuronCoresPerInf = int64(4)
DashboardTitle = "# cortex monitoring dashboard"
DefaultMaxReplicaConcurrency = int64(1024)
NeuronCoresPerInf = int64(4)
)

func defaultDockerImage(imageName string) string {
Expand Down
1 change: 0 additions & 1 deletion pkg/cortex/serve/cortex_internal/lib/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@
# limitations under the License.

from cortex_internal.lib.api.predictor import Predictor
from cortex_internal.lib.api.monitoring import Monitoring
from cortex_internal.lib.api.api import API, get_api, get_spec
48 changes: 1 addition & 47 deletions pkg/cortex/serve/cortex_internal/lib/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import Any, Dict, Optional, Tuple, Union

import datadog
from cortex_internal.lib.api import Monitoring, Predictor
from cortex_internal.lib.api import Predictor
from cortex_internal.lib.exceptions import CortexException
from cortex_internal.lib.storage import LocalStorage, S3, GCS
from cortex_internal.lib.log import logger
Expand Down Expand Up @@ -49,10 +49,6 @@ def __init__(
self.name = api_spec["name"]
self.predictor = Predictor(provider, api_spec, model_dir)

self.monitoring = None
if self.api_spec.get("monitoring") is not None:
self.monitoring = Monitoring(**self.api_spec["monitoring"])

if provider != "local":
host_ip = os.environ["HOST_IP"]
datadog.initialize(statsd_host=host_ip, statsd_port="8125")
Expand All @@ -65,24 +61,6 @@ def __init__(
def server_side_batching_enabled(self):
return self.api_spec["predictor"].get("server_side_batching") is not None

def get_cached_classes(self):
prefix = os.path.join(self.metadata_root, "classes") + "/"
class_paths, _ = self.storage.search(prefix=prefix)
class_set = set()
for class_path in class_paths:
encoded_class_name = class_path.split("/")[-1]
class_set.add(base64.urlsafe_b64decode(encoded_class_name.encode()).decode())
return class_set

def upload_class(self, class_name: str):
try:
ascii_encoded = class_name.encode("ascii") # cloudwatch only supports ascii
encoded_class_name = base64.urlsafe_b64encode(ascii_encoded)
key = os.path.join(self.metadata_root, "classes", encoded_class_name.decode())
self.storage.put_json("", key)
except Exception as e:
raise ValueError("unable to store class {}".format(class_name)) from e

def metric_dimensions_with_id(self):
return [
{"Name": "APIName", "Value": self.name},
Expand All @@ -106,14 +84,6 @@ def post_request_metrics(self, status_code, total_time):
]
self.post_metrics(metrics)

def post_monitoring_metrics(self, prediction_value=None):
if prediction_value is not None:
metrics = [
self.prediction_metrics(self.metric_dimensions(), prediction_value),
self.prediction_metrics(self.metric_dimensions_with_id(), prediction_value),
]
self.post_metrics(metrics)

def post_metrics(self, metrics):
try:
if self.statsd is None:
Expand Down Expand Up @@ -168,22 +138,6 @@ def latency_metric(self, dimensions, total_time):
"Value": total_time, # milliseconds
}

def prediction_metrics(self, dimensions, prediction_value):
if self.monitoring.model_type == "classification":
dimensions_with_class = dimensions + [{"Name": "Class", "Value": str(prediction_value)}]
return {
"MetricName": "Prediction",
"Dimensions": dimensions_with_class,
"Unit": "Count",
"Value": 1,
}
else:
return {
"MetricName": "Prediction",
"Dimensions": dimensions,
"Value": float(prediction_value),
}


def get_api(
provider: str,
Expand Down
54 changes: 0 additions & 54 deletions pkg/cortex/serve/cortex_internal/lib/api/monitoring.py

This file was deleted.

30 changes: 0 additions & 30 deletions pkg/cortex/serve/cortex_internal/serve/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from cortex_internal.lib.exceptions import UserRuntimeException
from fastapi import FastAPI
from fastapi.exceptions import RequestValidationError
from starlette.background import BackgroundTasks
from starlette.exceptions import HTTPException as StarletteHTTPException
from starlette.requests import Request
from starlette.responses import JSONResponse, PlainTextResponse, Response
Expand All @@ -62,7 +61,6 @@
"dynamic_batcher": None,
"predict_route": None,
"client": None,
"class_set": set(),
}


Expand Down Expand Up @@ -191,8 +189,6 @@ async def parse_payload(request: Request, call_next):


def predict(request: Request):
tasks = BackgroundTasks()
api = local_cache["api"]
predictor_impl = local_cache["predictor_impl"]
dynamic_batcher = local_cache["dynamic_batcher"]
kwargs = build_predict_kwargs(request)
Expand All @@ -219,26 +215,10 @@ def predict(request: Request):
) from e
response = Response(content=json_string, media_type="application/json")

if local_cache["provider"] not in ["local", "gcp"] and api.monitoring is not None:
try:
predicted_value = api.monitoring.extract_predicted_value(prediction)
api.post_monitoring_metrics(predicted_value)
if (
api.monitoring.model_type == "classification"
and predicted_value not in local_cache["class_set"]
):
tasks.add_task(api.upload_class, class_name=predicted_value)
local_cache["class_set"].add(predicted_value)
except:
logger.warn("unable to record prediction metric", exc_info=True)

if util.has_method(predictor_impl, "post_predict"):
kwargs = build_post_predict_kwargs(prediction, request)
request_thread_pool.submit(predictor_impl.post_predict, **kwargs)

if len(tasks.tasks) > 0:
response.background = tasks

return response


Expand Down Expand Up @@ -355,16 +335,6 @@ def start_fn():
logger.exception("failed to start api")
sys.exit(1)

if (
provider != "local"
and api.monitoring is not None
and api.monitoring.model_type == "classification"
):
try:
local_cache["class_set"] = api.get_cached_classes()
except:
logger.warn("an error occurred while attempting to load classes", exc_info=True)

app.add_api_route(local_cache["predict_route"], predict, methods=["POST"])
app.add_api_route(local_cache["predict_route"], get_summary, methods=["GET"])

Expand Down
Loading