Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ import (
func main() {
var (
port int
adminPort int
inCluster bool
autoscalerURL string
namespace string
clusterConfigPath string
)

flag.IntVar(&port, "port", 8000, "port where the activator server will be exposed")
flag.IntVar(&adminPort, "admin-port", 15000, "port where the admin server will be exposed")
flag.BoolVar(&inCluster, "in-cluster", false, "use when autoscaler runs in-cluster")
flag.StringVar(&autoscalerURL, "autoscaler-url", "", "the URL for the cortex autoscaler endpoint")
flag.StringVar(&namespace, "namespace", os.Getenv("CORTEX_NAMESPACE"),
Expand Down Expand Up @@ -114,6 +116,8 @@ func main() {
kubeClient := k8sClient.ClientSet()
autoscalerClient := autoscaler.NewClient(autoscalerURL)

prometheusStatsReporter := activator.NewPrometheusStatsReporter()

istioInformerFactory := istioinformers.NewSharedInformerFactoryWithOptions(
istioClient, 10*time.Second, // TODO: check how much makes sense
istioinformers.WithNamespace(namespace),
Expand All @@ -129,12 +133,29 @@ func main() {
)
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments().Informer()

act := activator.New(virtualServiceClient, deploymentInformer, virtualServiceInformer, autoscalerClient, log)
act := activator.New(
virtualServiceClient,
deploymentInformer,
virtualServiceInformer,
autoscalerClient,
prometheusStatsReporter,
log,
)

handler := activator.NewHandler(act, log)
server := &http.Server{
Addr: ":" + strconv.Itoa(port),
Handler: handler,

adminHandler := http.NewServeMux()
adminHandler.Handle("/metrics", prometheusStatsReporter)

servers := map[string]*http.Server{
"activator": {
Addr: ":" + strconv.Itoa(port),
Handler: handler,
},
"admin": {
Addr: ":" + strconv.Itoa(adminPort),
Handler: adminHandler,
},
}

stopCh := make(chan struct{})
Expand All @@ -145,10 +166,12 @@ func main() {
}()

errCh := make(chan error)
go func() {
log.Infof("Starting activator server on %s", server.Addr)
errCh <- server.ListenAndServe()
}()
for name, server := range servers {
go func(name string, server *http.Server) {
log.Infof("Starting %s server on %s", name, server.Addr)
errCh <- server.ListenAndServe()
}(name, server)
}

sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt)
Expand All @@ -159,10 +182,14 @@ func main() {
case <-sigint:
// We received an interrupt signal, shut down.
log.Info("Received TERM signal, handling a graceful shutdown...")
log.Info("Shutting down server")
if err = server.Shutdown(context.Background()); err != nil {
// Error from closing listeners, or context timeout:
log.Warnw("HTTP server Shutdown Error", zap.Error(err))

for name, server := range servers {
log.Infof("Shutting down %s server", name)
if err = server.Shutdown(context.Background()); err != nil {
// Error from closing listeners, or context timeout:
log.Warnw("HTTP server Shutdown Error", zap.Error(err))
telemetry.Error(errors.Wrap(err, "HTTP server Shutdown Error"))
}
}
log.Info("Shutdown complete, exiting...")
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,13 @@ func main() {

for name, server := range servers {
log.Infof("Shutting down %s server", name)
if err := server.Shutdown(context.Background()); err != nil {
if err = server.Shutdown(context.Background()); err != nil {
// Error from closing listeners, or context timeout:
log.Warnw("HTTP server Shutdown Error", zap.Error(err))
telemetry.Error(errors.Wrap(err, "HTTP server Shutdown Error"))
}
}
log.Info("Shutdown complete, exiting...")
telemetry.Close()
}
}

Expand Down
5 changes: 4 additions & 1 deletion manager/manifests/activator.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ spec:
- "--namespace=default"
- "--cluster-config=/configs/cluster/cluster.yaml"
ports:
- containerPort: 8000
- name: http
containerPort: 8000
- name: admin
containerPort: 15000
livenessProbe:
httpGet:
port: 8000
Expand Down
6 changes: 3 additions & 3 deletions manager/manifests/grafana/grafana-dashboard-realtime.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ data:
"steppedLine": false,
"targets": [
{
"expr": "count(cortex_in_flight_requests{api_name=~\"$api_name\"}) by (api_name)",
"expr": "count(cortex_in_flight_requests{api_name=~\"$api_name\", container!=\"activator\"}) by (api_name)",
"interval": "",
"legendFormat": "{{api_name}}",
"refId": "Active Replicas"
Expand Down Expand Up @@ -2027,7 +2027,7 @@ data:
"value": "None"
},
"datasource": null,
"definition": "label_values(cortex_in_flight_requests{api_kind=\"RealtimeAPI\"}, api_name)",
"definition": "label_values(cortex_in_flight_requests, api_name)",
"description": null,
"error": null,
"hide": 0,
Expand All @@ -2037,7 +2037,7 @@ data:
"name": "api_name",
"options": [],
"query": {
"query": "label_values(cortex_in_flight_requests{api_kind=\"RealtimeAPI\"}, api_name)",
"query": "label_values(cortex_in_flight_requests, api_name)",
"refId": "StandardVariableQuery"
},
"refresh": 1,
Expand Down
44 changes: 44 additions & 0 deletions manager/manifests/prometheus-monitoring.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -347,3 +347,47 @@ spec:
selector:
matchLabels:
cortex.dev/name: operator

---

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
name: activator-stats
labels:
monitoring.cortex.dev: "activator"
spec:
selector:
matchLabels:
app: activator
matchExpressions:
- { key: prometheus-ignore, operator: DoesNotExist }
namespaceSelector:
any: true
jobLabel: activator-stats
podMetricsEndpoints:
- path: /metrics
scheme: http
interval: 10s
port: admin
relabelings:
- action: keep
sourceLabels: [ __meta_kubernetes_pod_container_name ]
regex: "activator"
- sourceLabels: [ __address__, __meta_kubernetes_pod_annotation_prometheus_io_port ]
action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
targetLabel: __address__
- action: labeldrop
regex: "__meta_kubernetes_pod_label_(.+)"
- sourceLabels: [ __meta_kubernetes_namespace ]
action: replace
targetLabel: namespace
- sourceLabels: [ __meta_kubernetes_pod_name ]
action: replace
targetLabel: pod_name
metricRelabelings:
- action: keep
sourceLabels: [__name__]
regex: "cortex_(.+)"
12 changes: 12 additions & 0 deletions pkg/activator/activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type ctxValue string

const APINameCtxKey ctxValue = "apiName"

type StatsReporter interface {
AddAPI(apiName string)
RemoveAPI(apiName string)
}

type Activator interface {
Try(ctx context.Context, fn func() error) error
}
Expand All @@ -47,6 +52,7 @@ type activator struct {
apiActivators map[string]*apiActivator
readinessTrackers map[string]*readinessTracker
istioClient istionetworkingclient.VirtualServiceInterface
reporter StatsReporter
logger *zap.SugaredLogger
}

Expand All @@ -55,6 +61,7 @@ func New(
deploymentInformer cache.SharedIndexInformer,
virtualServiceInformer cache.SharedIndexInformer,
autoscalerClient autoscaler.Client,
reporter StatsReporter,
logger *zap.SugaredLogger,
) Activator {
log := logger.With(zap.String("apiKind", userconfig.RealtimeAPIKind.String()))
Expand All @@ -65,6 +72,7 @@ func New(
istioClient: istioClient,
logger: log,
autoscalerClient: autoscalerClient,
reporter: reporter,
}

virtualServiceInformer.AddEventHandler(
Expand Down Expand Up @@ -169,6 +177,8 @@ func (a *activator) addAPI(obj interface{}) {
a.apiActivators[apiName] = newAPIActivator(apiMetadata.maxQueueLength, apiMetadata.maxConcurrency)
}
a.activatorsMux.Unlock()

a.reporter.AddAPI(apiName)
}

func (a *activator) updateAPI(oldObj interface{}, newObj interface{}) {
Expand Down Expand Up @@ -218,6 +228,8 @@ func (a *activator) removeAPI(obj interface{}) {
a.activatorsMux.Lock()
delete(a.apiActivators, apiMetadata.apiName)
a.activatorsMux.Unlock()

a.reporter.RemoveAPI(apiMetadata.apiName)
}

func (a *activator) awakenAPI(apiName string) {
Expand Down
54 changes: 54 additions & 0 deletions pkg/activator/request_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright 2021 Cortex Labs, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package activator

import (
"net/http"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type PrometheusStatsReporter struct {
handler http.Handler
inFlightRequests *prometheus.GaugeVec
}

func NewPrometheusStatsReporter() *PrometheusStatsReporter {
inFlightRequestsGauge := promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_in_flight_requests",
Help: "The number of in-flight requests for a cortex API",
}, []string{"api_name"})

return &PrometheusStatsReporter{
handler: promhttp.Handler(),
inFlightRequests: inFlightRequestsGauge,
}
}

func (r *PrometheusStatsReporter) AddAPI(apiName string) {
r.inFlightRequests.WithLabelValues(apiName).Set(0)
}

func (r *PrometheusStatsReporter) RemoveAPI(apiName string) {
r.inFlightRequests.DeleteLabelValues(apiName)
}

func (r *PrometheusStatsReporter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
r.handler.ServeHTTP(w, req)
}
4 changes: 2 additions & 2 deletions pkg/autoscaler/realtime_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ func (s *RealtimeScaler) GetInFlightRequests(apiName string, window time.Duratio

// PromQL query:
// sum(sum_over_time(cortex_in_flight_requests{api_name="<apiName>"}[60s])) /
// sum(count_over_time(cortex_in_flight_requests{api_name="<apiName>"}[60s]))
// sum(count_over_time(cortex_in_flight_requests{api_name="<apiName>", container!="activator"}[60s]))
query := fmt.Sprintf(
"sum(sum_over_time(cortex_in_flight_requests{api_name=\"%s\"}[%ds])) / "+
"max(count_over_time(cortex_in_flight_requests{api_name=\"%s\"}[%ds]))",
"max(count_over_time(cortex_in_flight_requests{api_name=\"%s\", container!=\"activator\"}[%ds]))",
apiName, windowSeconds,
apiName, windowSeconds,
)
Expand Down