From 59c5168043c81faa4013e96a866830c680ade9c3 Mon Sep 17 00:00:00 2001 From: Miguel Varela Ramos Date: Thu, 15 Jul 2021 19:12:47 +0200 Subject: [PATCH] Add admin server to activator and make sure in-flight are initialized to zero when min_replicas=0 --- cmd/activator/main.go | 51 +++++++++++++----- cmd/proxy/main.go | 3 +- manager/manifests/activator.yaml.j2 | 5 +- .../grafana/grafana-dashboard-realtime.yaml | 6 +-- manager/manifests/prometheus-monitoring.yaml | 44 +++++++++++++++ pkg/activator/activator.go | 12 +++++ pkg/activator/request_stats.go | 54 +++++++++++++++++++ pkg/autoscaler/realtime_scaler.go | 4 +- 8 files changed, 159 insertions(+), 20 deletions(-) create mode 100644 pkg/activator/request_stats.go diff --git a/cmd/activator/main.go b/cmd/activator/main.go index dbae4b021f..7396bb890e 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -44,6 +44,7 @@ import ( func main() { var ( port int + adminPort int inCluster bool autoscalerURL string namespace string @@ -51,6 +52,7 @@ func main() { ) flag.IntVar(&port, "port", 8000, "port where the activator server will be exposed") + flag.IntVar(&adminPort, "admin-port", 15000, "port where the admin server will be exposed") flag.BoolVar(&inCluster, "in-cluster", false, "use when autoscaler runs in-cluster") flag.StringVar(&autoscalerURL, "autoscaler-url", "", "the URL for the cortex autoscaler endpoint") flag.StringVar(&namespace, "namespace", os.Getenv("CORTEX_NAMESPACE"), @@ -114,6 +116,8 @@ func main() { kubeClient := k8sClient.ClientSet() autoscalerClient := autoscaler.NewClient(autoscalerURL) + prometheusStatsReporter := activator.NewPrometheusStatsReporter() + istioInformerFactory := istioinformers.NewSharedInformerFactoryWithOptions( istioClient, 10*time.Second, // TODO: check how much makes sense istioinformers.WithNamespace(namespace), @@ -129,12 +133,29 @@ func main() { ) deploymentInformer := kubeInformerFactory.Apps().V1().Deployments().Informer() - act := activator.New(virtualServiceClient, deploymentInformer, virtualServiceInformer, autoscalerClient, log) + act := activator.New( + virtualServiceClient, + deploymentInformer, + virtualServiceInformer, + autoscalerClient, + prometheusStatsReporter, + log, + ) handler := activator.NewHandler(act, log) - server := &http.Server{ - Addr: ":" + strconv.Itoa(port), - Handler: handler, + + adminHandler := http.NewServeMux() + adminHandler.Handle("/metrics", prometheusStatsReporter) + + servers := map[string]*http.Server{ + "activator": { + Addr: ":" + strconv.Itoa(port), + Handler: handler, + }, + "admin": { + Addr: ":" + strconv.Itoa(adminPort), + Handler: adminHandler, + }, } stopCh := make(chan struct{}) @@ -145,10 +166,12 @@ func main() { }() errCh := make(chan error) - go func() { - log.Infof("Starting activator server on %s", server.Addr) - errCh <- server.ListenAndServe() - }() + for name, server := range servers { + go func(name string, server *http.Server) { + log.Infof("Starting %s server on %s", name, server.Addr) + errCh <- server.ListenAndServe() + }(name, server) + } sigint := make(chan os.Signal, 1) signal.Notify(sigint, os.Interrupt) @@ -159,10 +182,14 @@ func main() { case <-sigint: // We received an interrupt signal, shut down. log.Info("Received TERM signal, handling a graceful shutdown...") - log.Info("Shutting down server") - if err = server.Shutdown(context.Background()); err != nil { - // Error from closing listeners, or context timeout: - log.Warnw("HTTP server Shutdown Error", zap.Error(err)) + + for name, server := range servers { + log.Infof("Shutting down %s server", name) + if err = server.Shutdown(context.Background()); err != nil { + // Error from closing listeners, or context timeout: + log.Warnw("HTTP server Shutdown Error", zap.Error(err)) + telemetry.Error(errors.Wrap(err, "HTTP server Shutdown Error")) + } } log.Info("Shutdown complete, exiting...") } diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index 53306173ea..e7575eaf25 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -175,14 +175,13 @@ func main() { for name, server := range servers { log.Infof("Shutting down %s server", name) - if err := server.Shutdown(context.Background()); err != nil { + if err = server.Shutdown(context.Background()); err != nil { // Error from closing listeners, or context timeout: log.Warnw("HTTP server Shutdown Error", zap.Error(err)) telemetry.Error(errors.Wrap(err, "HTTP server Shutdown Error")) } } log.Info("Shutdown complete, exiting...") - telemetry.Close() } } diff --git a/manager/manifests/activator.yaml.j2 b/manager/manifests/activator.yaml.j2 index 6901c25b1d..eeea185b0c 100644 --- a/manager/manifests/activator.yaml.j2 +++ b/manager/manifests/activator.yaml.j2 @@ -83,7 +83,10 @@ spec: - "--namespace=default" - "--cluster-config=/configs/cluster/cluster.yaml" ports: - - containerPort: 8000 + - name: http + containerPort: 8000 + - name: admin + containerPort: 15000 livenessProbe: httpGet: port: 8000 diff --git a/manager/manifests/grafana/grafana-dashboard-realtime.yaml b/manager/manifests/grafana/grafana-dashboard-realtime.yaml index f76d4cdc9e..97e91cc318 100644 --- a/manager/manifests/grafana/grafana-dashboard-realtime.yaml +++ b/manager/manifests/grafana/grafana-dashboard-realtime.yaml @@ -427,7 +427,7 @@ data: "steppedLine": false, "targets": [ { - "expr": "count(cortex_in_flight_requests{api_name=~\"$api_name\"}) by (api_name)", + "expr": "count(cortex_in_flight_requests{api_name=~\"$api_name\", container!=\"activator\"}) by (api_name)", "interval": "", "legendFormat": "{{api_name}}", "refId": "Active Replicas" @@ -2027,7 +2027,7 @@ data: "value": "None" }, "datasource": null, - "definition": "label_values(cortex_in_flight_requests{api_kind=\"RealtimeAPI\"}, api_name)", + "definition": "label_values(cortex_in_flight_requests, api_name)", "description": null, "error": null, "hide": 0, @@ -2037,7 +2037,7 @@ data: "name": "api_name", "options": [], "query": { - "query": "label_values(cortex_in_flight_requests{api_kind=\"RealtimeAPI\"}, api_name)", + "query": "label_values(cortex_in_flight_requests, api_name)", "refId": "StandardVariableQuery" }, "refresh": 1, diff --git a/manager/manifests/prometheus-monitoring.yaml b/manager/manifests/prometheus-monitoring.yaml index 0982504aff..3935ef0bea 100644 --- a/manager/manifests/prometheus-monitoring.yaml +++ b/manager/manifests/prometheus-monitoring.yaml @@ -339,3 +339,47 @@ spec: selector: matchLabels: cortex.dev/name: operator + +--- + +apiVersion: monitoring.coreos.com/v1 +kind: PodMonitor +metadata: + name: activator-stats + labels: + monitoring.cortex.dev: "activator" +spec: + selector: + matchLabels: + app: activator + matchExpressions: + - { key: prometheus-ignore, operator: DoesNotExist } + namespaceSelector: + any: true + jobLabel: activator-stats + podMetricsEndpoints: + - path: /metrics + scheme: http + interval: 10s + port: admin + relabelings: + - action: keep + sourceLabels: [ __meta_kubernetes_pod_container_name ] + regex: "activator" + - sourceLabels: [ __address__, __meta_kubernetes_pod_annotation_prometheus_io_port ] + action: replace + regex: ([^:]+)(?::\d+)?;(\d+) + replacement: $1:$2 + targetLabel: __address__ + - action: labeldrop + regex: "__meta_kubernetes_pod_label_(.+)" + - sourceLabels: [ __meta_kubernetes_namespace ] + action: replace + targetLabel: namespace + - sourceLabels: [ __meta_kubernetes_pod_name ] + action: replace + targetLabel: pod_name + metricRelabelings: + - action: keep + sourceLabels: [__name__] + regex: "cortex_(.+)" diff --git a/pkg/activator/activator.go b/pkg/activator/activator.go index 552acb15a0..b7c54adc3d 100644 --- a/pkg/activator/activator.go +++ b/pkg/activator/activator.go @@ -36,6 +36,11 @@ type ctxValue string const APINameCtxKey ctxValue = "apiName" +type StatsReporter interface { + AddAPI(apiName string) + RemoveAPI(apiName string) +} + type Activator interface { Try(ctx context.Context, fn func() error) error } @@ -47,6 +52,7 @@ type activator struct { apiActivators map[string]*apiActivator readinessTrackers map[string]*readinessTracker istioClient istionetworkingclient.VirtualServiceInterface + reporter StatsReporter logger *zap.SugaredLogger } @@ -55,6 +61,7 @@ func New( deploymentInformer cache.SharedIndexInformer, virtualServiceInformer cache.SharedIndexInformer, autoscalerClient autoscaler.Client, + reporter StatsReporter, logger *zap.SugaredLogger, ) Activator { log := logger.With(zap.String("apiKind", userconfig.RealtimeAPIKind.String())) @@ -65,6 +72,7 @@ func New( istioClient: istioClient, logger: log, autoscalerClient: autoscalerClient, + reporter: reporter, } virtualServiceInformer.AddEventHandler( @@ -169,6 +177,8 @@ func (a *activator) addAPI(obj interface{}) { a.apiActivators[apiName] = newAPIActivator(apiMetadata.maxQueueLength, apiMetadata.maxConcurrency) } a.activatorsMux.Unlock() + + a.reporter.AddAPI(apiName) } func (a *activator) updateAPI(oldObj interface{}, newObj interface{}) { @@ -218,6 +228,8 @@ func (a *activator) removeAPI(obj interface{}) { a.activatorsMux.Lock() delete(a.apiActivators, apiMetadata.apiName) a.activatorsMux.Unlock() + + a.reporter.RemoveAPI(apiMetadata.apiName) } func (a *activator) awakenAPI(apiName string) { diff --git a/pkg/activator/request_stats.go b/pkg/activator/request_stats.go new file mode 100644 index 0000000000..3ea8f4c2db --- /dev/null +++ b/pkg/activator/request_stats.go @@ -0,0 +1,54 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package activator + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +type PrometheusStatsReporter struct { + handler http.Handler + inFlightRequests *prometheus.GaugeVec +} + +func NewPrometheusStatsReporter() *PrometheusStatsReporter { + inFlightRequestsGauge := promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_in_flight_requests", + Help: "The number of in-flight requests for a cortex API", + }, []string{"api_name"}) + + return &PrometheusStatsReporter{ + handler: promhttp.Handler(), + inFlightRequests: inFlightRequestsGauge, + } +} + +func (r *PrometheusStatsReporter) AddAPI(apiName string) { + r.inFlightRequests.WithLabelValues(apiName).Set(0) +} + +func (r *PrometheusStatsReporter) RemoveAPI(apiName string) { + r.inFlightRequests.DeleteLabelValues(apiName) +} + +func (r *PrometheusStatsReporter) ServeHTTP(w http.ResponseWriter, req *http.Request) { + r.handler.ServeHTTP(w, req) +} diff --git a/pkg/autoscaler/realtime_scaler.go b/pkg/autoscaler/realtime_scaler.go index bd744193b9..4068d0343c 100644 --- a/pkg/autoscaler/realtime_scaler.go +++ b/pkg/autoscaler/realtime_scaler.go @@ -101,10 +101,10 @@ func (s *RealtimeScaler) GetInFlightRequests(apiName string, window time.Duratio // PromQL query: // sum(sum_over_time(cortex_in_flight_requests{api_name=""}[60s])) / - // sum(count_over_time(cortex_in_flight_requests{api_name=""}[60s])) + // sum(count_over_time(cortex_in_flight_requests{api_name="", container!="activator"}[60s])) query := fmt.Sprintf( "sum(sum_over_time(cortex_in_flight_requests{api_name=\"%s\"}[%ds])) / "+ - "max(count_over_time(cortex_in_flight_requests{api_name=\"%s\"}[%ds]))", + "max(count_over_time(cortex_in_flight_requests{api_name=\"%s\", container!=\"activator\"}[%ds]))", apiName, windowSeconds, apiName, windowSeconds, )