Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fission metrics integration #677

Merged
merged 15 commits into from May 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions charts/fission-all/templates/deployment.yaml
Expand Up @@ -168,6 +168,10 @@ spec:
labels:
application: fission-router
svc: router
annotations:
prometheus.io/scrape: "true"
prometheus.io/path: "/metrics"
prometheus.io/port: "8080"
spec:
containers:
- name: router
Expand All @@ -188,6 +192,11 @@ spec:
port: 8888
initialDelaySeconds: 35
periodSeconds: 5
ports:
- containerPort: 8080
name: metrics
- containerPort: 8888
name: http
serviceAccount: fission-svc

---
Expand Down Expand Up @@ -219,6 +228,10 @@ spec:
metadata:
labels:
svc: executor
annotations:
prometheus.io/scrape: "true"
prometheus.io/path: "/metrics"
prometheus.io/port: "8080"
spec:
containers:
- name: executor
Expand Down Expand Up @@ -248,6 +261,11 @@ spec:
port: 8888
initialDelaySeconds: 35
periodSeconds: 5
ports:
- containerPort: 8080
name: metrics
- containerPort: 8888
name: http
serviceAccount: fission-svc

---
Expand Down
1 change: 1 addition & 0 deletions executor/api.go
Expand Up @@ -89,6 +89,7 @@ func (executor *Executor) getServiceForFunction(m *metav1.ObjectMeta) (string, e
if resp.err != nil {
return "", resp.err
}
executor.fsCache.IncreaseColdStarts(m.Name, string(m.UID))
return resp.funcSvc.Address, resp.err
}

Expand Down
14 changes: 12 additions & 2 deletions executor/executor.go
Expand Up @@ -18,12 +18,14 @@ package executor

import (
"log"
"net/http"
"runtime/debug"
"strings"
"sync"
"time"

"github.com/dchest/uniuri"
"github.com/prometheus/client_golang/prometheus/promhttp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/fission/fission"
Expand Down Expand Up @@ -202,8 +204,15 @@ func dumpStackTrace() {
debug.PrintStack()
}

// StartExecutor Starts executor and the executor components such as Poolmgr,
// deploymgr and potential future executor types
func serveMetric() {
// Expose the registered metrics via HTTP.
metricAddr := ":8080"
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(metricAddr, nil))
}

// StartExecutor Starts executor and the backend components that executor uses such as Poolmgr,
// deploymgr and potential future backends
func StartExecutor(fissionNamespace string, functionNamespace string, port int) error {
// setup a signal handler for SIGTERM
fission.SetupStackTraceHandler()
Expand Down Expand Up @@ -238,6 +247,7 @@ func StartExecutor(fissionNamespace string, functionNamespace string, port int)
api := MakeExecutor(gpm, ndm, fissionClient, fsCache)

go api.Serve(port)
go serveMetric()

return nil
}
5 changes: 5 additions & 0 deletions executor/fscache/functionServiceCache.go
Expand Up @@ -216,6 +216,7 @@ func (fsc *FunctionServiceCache) Add(fsvc FuncSvc) (*FuncSvc, error) {
return nil, err
}

fsc.setFuncAlive(fsvc.Function.Name, string(fsvc.Function.UID), true)
return nil, nil
}

Expand Down Expand Up @@ -249,6 +250,10 @@ func (fsc *FunctionServiceCache) DeleteEntry(fsvc *FuncSvc) {
fsc.byFunction.Delete(crd.CacheKey(fsvc.Function))
fsc.byAddress.Delete(fsvc.Address)
fsc.byFunctionUID.Delete(fsvc.Function.UID)

fsc.observeFuncRunningTime(fsvc.Function.Name, string(fsvc.Function.UID), fsvc.Atime.Sub(fsvc.Ctime).Seconds())
fsc.observeFuncAliveTime(fsvc.Function.Name, string(fsvc.Function.UID), time.Now().Sub(fsvc.Ctime).Seconds())
fsc.setFuncAlive(fsvc.Function.Name, string(fsvc.Function.UID), false)
}

func (fsc *FunctionServiceCache) DeleteOld(fsvc *FuncSvc, minAge time.Duration) (bool, error) {
Expand Down
70 changes: 70 additions & 0 deletions executor/fscache/metrics.go
@@ -0,0 +1,70 @@
package fscache

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
metricAddr = ":8080"

// funcname: the function's name
// funcuid: the function's version id
coldStarts = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "fission_cold_starts_total",
Help: "How many cold starts are made by funcname, funcuid.",
},
[]string{"funcname", "funcuid"},
)
funcRunningSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "fission_func_running_seconds_summary",
Help: "The running time (last access - create) in seconds of the function.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"funcname", "funcuid"},
)
funcAliveSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "fission_func_alive_seconds_summary",
Help: "The alive time in seconds of the function.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"funcname", "funcuid"},
)
funcIsAlive = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "fission_func_is_alive",
Help: "A binary value indicating is the funcname, funcuid alive",
},
[]string{"funcname", "funcuid"},
)
)

func init() {
// Register the function calls counter with Prometheus's default registry.
prometheus.MustRegister(coldStarts)
prometheus.MustRegister(funcRunningSummary)
prometheus.MustRegister(funcAliveSummary)
prometheus.MustRegister(funcIsAlive)
}

func (fsc *FunctionServiceCache) IncreaseColdStarts(funcname, funcuid string) {
coldStarts.WithLabelValues(funcname, funcuid).Inc()
}

func (fsc *FunctionServiceCache) observeFuncRunningTime(funcname, funcuid string, running float64) {
funcRunningSummary.WithLabelValues(funcname, funcuid).Observe(running)
}

func (fsc *FunctionServiceCache) observeFuncAliveTime(funcname, funcuid string, alive float64) {
funcAliveSummary.WithLabelValues(funcname, funcuid).Observe(alive)
}

func (fsc *FunctionServiceCache) setFuncAlive(funcname, funcuid string, isAlive bool) {
count := 0
if isAlive {
count = 1
}
funcIsAlive.WithLabelValues(funcname, funcuid).Set(float64(count))
}
31 changes: 31 additions & 0 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions glide.yaml
Expand Up @@ -62,3 +62,6 @@ import:
version: ~0.3.2
- package: github.com/hashicorp/go-multierror
- package: github.com/hashicorp/errwrap
- package: github.com/prometheus/client_golang
version: v0.8.0

36 changes: 30 additions & 6 deletions router/functionHandler.go
Expand Up @@ -29,13 +29,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/fission/fission"
"github.com/fission/fission/crd"
executorClient "github.com/fission/fission/executor/client"
)

type functionHandler struct {
fmap *functionServiceMap
executor *executorClient.Client
function *metav1.ObjectMeta
fmap *functionServiceMap
executor *executorClient.Client
function *metav1.ObjectMeta
httpTrigger *crd.HTTPTrigger
}

// A layer on top of http.DefaultTransport, with retries.
Expand Down Expand Up @@ -75,6 +77,20 @@ func (roundTripper RetryingRoundTripper) RoundTrip(req *http.Request) (resp *htt
var needExecutor, serviceUrlFromExecutor bool
var serviceUrl *url.URL

// Metrics stuff
startTime := time.Now()
funcMetricLabels := &functionLabels{
namespace: roundTripper.funcHandler.function.Namespace,
name: roundTripper.funcHandler.function.Name,
}
httpMetricLabels := &httpLabels{
method: req.Method,
}
if roundTripper.funcHandler.httpTrigger != nil {
httpMetricLabels.host = roundTripper.funcHandler.httpTrigger.Spec.Host
httpMetricLabels.path = roundTripper.funcHandler.httpTrigger.Spec.RelativeURL
}

// set the timeout for transport context
timeout := roundTripper.initialTimeout
transport := http.DefaultTransport.(*http.Transport)
Expand Down Expand Up @@ -135,13 +151,23 @@ func (roundTripper RetryingRoundTripper) RoundTrip(req *http.Request) (resp *htt
KeepAlive: 30 * time.Second,
}).DialContext

overhead := time.Since(startTime)

// forward the request to the function service
resp, err = transport.RoundTrip(req)
if err == nil {
// Track metrics
httpMetricLabels.code = resp.StatusCode
funcMetricLabels.cached = !serviceUrlFromExecutor

functionCallCompleted(funcMetricLabels, httpMetricLabels,
overhead, time.Since(startTime), resp.ContentLength)

// if transport.RoundTrip succeeds and it was a cached entry, then tapService
if !serviceUrlFromExecutor {
go roundTripper.funcHandler.tapService(serviceUrl)
}

// return response back to user
return resp, nil
}
Expand Down Expand Up @@ -190,11 +216,9 @@ func (fh *functionHandler) handler(responseWriter http.ResponseWriter, request *
request.Header.Add(fmt.Sprintf("X-Fission-Params-%v", k), v)
}

// System Params
// system params
MetadataToHeaders(HEADERS_FISSION_FUNCTION_PREFIX, fh.function, request)

// TODO: As an optimization we may want to cache proxies too -- this might get us
// connection reuse and possibly better performance
director := func(req *http.Request) {
if _, ok := req.Header["User-Agent"]; !ok {
// explicitly disable User-Agent so it's not set to default value
Expand Down
7 changes: 4 additions & 3 deletions router/httpTriggers.go
Expand Up @@ -117,9 +117,10 @@ func (ts *HTTPTriggerSet) getRouter() *mux.Router {
}

fh := &functionHandler{
fmap: ts.functionServiceMap,
function: rr.functionMetadata,
executor: ts.executor,
fmap: ts.functionServiceMap,
function: rr.functionMetadata,
executor: ts.executor,
httpTrigger: &trigger,
}

ht := muxRouter.HandleFunc(trigger.Spec.RelativeURL, fh.handler)
Expand Down