Skip to content

Commit

Permalink
Adds pipeline metrics
Browse files Browse the repository at this point in the history
Often, as a developer or administartor(ops) I want some insights
about pipeline behavior in terms time taken to execute pipleinerun/taskrun,
its success or failure ratio, pod latencies etc.
At present tekton pipelines has very limted ways to surface such information
or its hard to get those details looking at resources yamls.

This patch exposes above mentioned pipelines metrics on '/metrics'
endpoint using knative `pkg/metrics` package. User can collect such
metrics using prometheus, stackdriver or other supported metrics system.

To some extent its solves
 - tektoncd#540
 - tektoncd#164
  • Loading branch information
hrishin committed Oct 7, 2019
1 parent 151f5da commit bbae434
Show file tree
Hide file tree
Showing 15 changed files with 1,639 additions and 33 deletions.
4 changes: 4 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

const (
// ControllerLogKey is the name of the logger for the controller cmd
ControllerLogKey = "controller"
ControllerLogKey = "tekton"
)

var (
Expand Down
17 changes: 17 additions & 0 deletions pkg/apis/pipeline/v1alpha1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"time"

"github.com/tektoncd/pipeline/pkg/apis/pipeline"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/apis"
Expand Down Expand Up @@ -290,3 +291,19 @@ func (tr *TaskRun) GetRunKey() string {
// The address of the pointer is a threadsafe unique identifier for the taskrun
return fmt.Sprintf("%s/%p", "TaskRun", tr)
}

// IsOfAPipeline return true if taskrun is a part of pipeline
func (tr *TaskRun) IsOfAPipeline() (bool, map[string]string) {
if tr == nil || len(tr.Labels) == 0 {
return false, nil
}

if val, ok := tr.Labels[pipeline.GroupName+pipeline.PipelineLabelKey]; ok {
pipelineData := make(map[string]string)
pipelineData[pipeline.PipelineLabelKey] = val
pipelineData[pipeline.PipelineRunLabelKey] = tr.Labels[pipeline.GroupName+pipeline.PipelineRunLabelKey]
return true, pipelineData
}

return false, nil
}
5 changes: 5 additions & 0 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func NewController(images reconciler.Images) func(context.Context, configmap.Wat
resourceInformer := resourceinformer.Get(ctx)
conditionInformer := conditioninformer.Get(ctx)
timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger)
metrics, err := NewRecorder()
if err != nil {
logger.Errorf("Failed to create pipelinerun metrics recorder %v", err)
}

opt := reconciler.Options{
KubeClientSet: kubeclientset,
Expand All @@ -74,6 +78,7 @@ func NewController(images reconciler.Images) func(context.Context, configmap.Wat
resourceLister: resourceInformer.Lister(),
conditionLister: conditionInformer.Lister(),
timeoutHandler: timeoutHandler,
metrics: metrics,
}
impl := controller.NewImpl(c, c.Logger, pipelineRunControllerName)

Expand Down
157 changes: 157 additions & 0 deletions pkg/reconciler/pipelinerun/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package pipelinerun

import (
"context"
"errors"
"fmt"
"time"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"knative.dev/pkg/metrics"
)

var (
prDuration = stats.Float64(
"pipelinerun_duration_seconds",
"The pipelinerun execution time in seconds",
stats.UnitDimensionless)
pRunDistributions = view.Distribution(10, 30, 60, 300, 900, 1800, 3600, 5400, 10800, 21600, 43200, 86400)

prCount = stats.Float64("pipelinerun_count",
"number of pipelineruns",
stats.UnitDimensionless)

runningPrsCount = stats.Float64("running_pipelineruns_count",
"Number of pipelineruns executing currently",
stats.UnitDimensionless)
)

type Recorder struct {
initialized bool

pipeline tag.Key
pipelineRun tag.Key
namespace tag.Key
status tag.Key
}

func NewRecorder() (*Recorder, error) {
r := &Recorder{
initialized: true,
}

pipeline, err := tag.NewKey("pipeline")
if err != nil {
return nil, err
}
r.pipeline = pipeline

pipelineRun, err := tag.NewKey("pipelinerun")
if err != nil {
return nil, err
}
r.pipelineRun = pipelineRun

namespace, err := tag.NewKey("namespace")
if err != nil {
return nil, err
}
r.namespace = namespace

status, err := tag.NewKey("status")
if err != nil {
return nil, err
}
r.status = status

err = view.Register(
&view.View{
Description: prDuration.Description(),
Measure: prDuration,
Aggregation: pRunDistributions,
TagKeys: []tag.Key{r.pipeline, r.pipelineRun, r.namespace, r.status},
},
&view.View{
Description: prCount.Description(),
Measure: prCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{r.status},
},
&view.View{
Description: runningPrsCount.Description(),
Measure: runningPrsCount,
Aggregation: view.LastValue(),
},
)

if err != nil {
r.initialized = false
return r, err
}

return r, nil
}

func (r *Recorder) DurationAndCount(pr *v1alpha1.PipelineRun) error {
if !r.initialized {
return errors.New(fmt.Sprintf("ignoring the metrics recording for %s , failed to initialize the metrics recorder", pr.Name))
}

duration := time.Since(pr.Status.StartTime.Time)
if pr.Status.CompletionTime != nil {
duration = pr.Status.CompletionTime.Sub(pr.Status.StartTime.Time)
}

status := "success"
if pr.Status.Conditions[0].Status == corev1.ConditionFalse {
status = "failed"
}

ctx, err := tag.New(
context.Background(),
tag.Insert(r.pipeline, pr.Spec.PipelineRef.Name),
tag.Insert(r.pipelineRun, pr.Name),
tag.Insert(r.namespace, pr.Namespace),
tag.Insert(r.status, status),
)

if err != nil {
return err
}

metrics.Record(ctx, prDuration.M(float64(duration/time.Second)))
metrics.Record(ctx, prCount.M(1))

return nil
}

func (r *Recorder) RunningPrsCount(lister listers.PipelineRunLister) error {
if !r.initialized {
return errors.New("ignoring the metrics recording, failed to initialize the metrics recorder")
}

prs, err := lister.List(labels.Everything())
if err != nil {
return errors.New(fmt.Sprintf("failed to list pipelineruns while generating metrics : %v", err))
}

var runningPrs int
for _, pr := range prs {
if !pr.IsDone() {
runningPrs ++
}
}

ctx, err := tag.New(
context.Background(),
)
metrics.Record(ctx, runningPrsCount.M(float64(runningPrs)))

return nil
}
144 changes: 144 additions & 0 deletions pkg/reconciler/pipelinerun/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package pipelinerun

import (
alpha1 "github.com/tektoncd/pipeline/pkg/client/informers/externalversions/pipeline/v1alpha1"
"testing"
"time"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
fakepipelineruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/pipelinerun/fake"
tb "github.com/tektoncd/pipeline/test/builder"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/metrics/metricstest"
rtesting "knative.dev/pkg/reconciler/testing"
)

func Test_uninitialized_metrics(t *testing.T) {
metrics := Recorder{}

durationCountError := metrics.DurationAndCount(&v1alpha1.PipelineRun{})
prCountError := metrics.RunningPrsCount(nil)

assertErrNotNil(durationCountError, "DurationAndCount recording expected to return error but got nil", t)
assertErrNotNil(prCountError, "Current PR count recording expected to return error but got nil", t)
}

func Test_record_pipelinerun_duration_count(t *testing.T) {
startTime := time.Now()

testData := []struct {
name string
taskRun *v1alpha1.PipelineRun
expectedTags map[string]string
expectedDuration float64
expectedCount int64
}{
{
name: "for_succeeded_pipeline",
taskRun: tb.PipelineRun("pipelinerun-1", "ns",
tb.PipelineRunSpec("pipeline-1"),
tb.PipelineRunStatus(
tb.PipelineRunStartTime(startTime),
tb.PipelineRunCompletionTime(startTime.Add(1*time.Minute)),
tb.PipelineRunStatusCondition(apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
}),
)),
expectedTags: map[string]string{
"pipeline": "pipeline-1",
"pipelinerun": "pipelinerun-1",
"namespace": "ns",
"status": "success",
},
expectedDuration: 60,
expectedCount: 1,
},
{
name: "for_failed_pipeline",
taskRun: tb.PipelineRun("pipelinerun-1", "ns",
tb.PipelineRunSpec("pipeline-1"),
tb.PipelineRunStatus(
tb.PipelineRunStartTime(startTime),
tb.PipelineRunCompletionTime(startTime.Add(1*time.Minute)),
tb.PipelineRunStatusCondition(apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
}),
)),
expectedTags: map[string]string{
"pipeline": "pipeline-1",
"pipelinerun": "pipelinerun-1",
"namespace": "ns",
"status": "failed",
},
expectedDuration: 60,
expectedCount: 1,
},
}

for _, test := range testData {
t.Run(test.name, func(t *testing.T) {
defer unregisterMetrics()

metrics, _ := NewRecorder()
err := metrics.DurationAndCount(test.taskRun)

assertErrIsNil(err, "DurationAndCount recording recording got an error", t)
metricstest.CheckDistributionData(t, "pipelinerun_duration_seconds", test.expectedTags, 1, test.expectedDuration, test.expectedDuration)
metricstest.CheckCountData(t, "pipelinerun_count", test.expectedTags, test.expectedCount)
})
}
}

func Test_record_running_pipelineruns_count(t *testing.T) {
defer unregisterMetrics()

ctx, _ := rtesting.SetupFakeContext(t)
informer := fakepipelineruninformer.Get(ctx)
addPipelineRun(informer, "pipelinerun-1", "pipeline-1", "ns", corev1.ConditionTrue, t)
addPipelineRun(informer, "pipelinerun-2", "pipeline-2", "ns", corev1.ConditionFalse, t)
addPipelineRun(informer, "pipelinerun-3", "pipeline-3", "ns", corev1.ConditionUnknown, t)

metrics, _ := NewRecorder()
err := metrics.RunningPrsCount(informer.Lister())

assertErrIsNil(err, "RunningPrsCount recording expected to return nil but got error", t)
metricstest.CheckLastValueData(t, "running_pipelineruns_count", map[string]string{}, 1)
}

func addPipelineRun(informer alpha1.PipelineRunInformer, run, pipeline, ns string, status corev1.ConditionStatus, t *testing.T) {
t.Helper()

err := informer.Informer().GetIndexer().Add(tb.PipelineRun(run, ns,
tb.PipelineRunSpec(pipeline),
tb.PipelineRunStatus(
tb.PipelineRunStatusCondition(apis.Condition{
Type: apis.ConditionSucceeded,
Status: status,
}),
)))

if err != nil {
t.Errorf("Failed to add the pipelinerun")
}
}

func assertErrNotNil(err error, message string, t *testing.T) {
t.Helper()
if err == nil {
t.Errorf(message)
}
}

func assertErrIsNil(err error, message string, t *testing.T) {
t.Helper()
if err != nil {
t.Errorf(message)
}
}

func unregisterMetrics() {
metricstest.Unregister("pipelinerun_duration_seconds", "pipelinerun_count", "running_pipelineruns_count")
}
Loading

0 comments on commit bbae434

Please sign in to comment.