Skip to content

Commit

Permalink
Current level gauge for FlyteWorkflows (flyteorg#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
wild-endeavor committed May 18, 2020
1 parent 185da4b commit 2fc3405
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 1 deletion.
2 changes: 1 addition & 1 deletion flytepropeller/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ storage:
access-key: minio
auth-type: accesskey
disable-ssl: true
endpoint: http://localhost:9000
endpoint: http://localhost:30084
region: us-east-1
secret-key: miniostorage
type: minio
Expand Down
110 changes: 110 additions & 0 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package controller

import (
"context"
"runtime/pprof"
"time"

"github.com/lyft/flytestdlib/contextutils"
"k8s.io/apimachinery/pkg/labels"

stdErrs "github.com/lyft/flytestdlib/errors"

Expand Down Expand Up @@ -33,11 +38,15 @@ import (
clientset "github.com/lyft/flytepropeller/pkg/client/clientset/versioned"
flyteScheme "github.com/lyft/flytepropeller/pkg/client/clientset/versioned/scheme"
informers "github.com/lyft/flytepropeller/pkg/client/informers/externalversions"
lister "github.com/lyft/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/nodes"
"github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/lyft/flytepropeller/pkg/controller/workflow"
)

const resourceLevelMonitorCycleDuration = 5 * time.Second
const missing = "missing"

type metrics struct {
Scope promutils.Scope
EnqueueCountWf prometheus.Counter
Expand All @@ -57,6 +66,7 @@ type Controller struct {
recorder record.EventRecorder
metrics *metrics
leaderElector *leaderelection.LeaderElector
levelMonitor *ResourceLevelMonitor
}

// Runs either as a leader -if configured- or as a standalone process.
Expand Down Expand Up @@ -86,6 +96,9 @@ func (c *Controller) run(ctx context.Context) error {
return err
}

// Start the collector process
c.levelMonitor.RunCollector(ctx)

// Start the informer factories to begin populating the informer caches
logger.Info(ctx, "Starting FlyteWorkflow controller")
return c.workerPool.Run(ctx, c.numWorkers, c.flyteworkflowSynced)
Expand Down Expand Up @@ -169,6 +182,101 @@ func (c *Controller) getWorkflowUpdatesHandler() cache.ResourceEventHandler {
}
}

// This object is responsible for emitting metrics that show the current number of Flyte workflows, cut by project and domain.
// It needs to be kicked off. The periodicity is not currently configurable because it seems unnecessary. It will also
// a timer measuring how long it takes to run each measurement cycle.
type ResourceLevelMonitor struct {
Scope promutils.Scope

// Meta timer - this times each collection cycle to measure how long it takes to collect the levels GaugeVec below
CollectorTimer promutils.StopWatch

// System Observability: This is a labeled gauge that emits the current number of FlyteWorkflow objects in the informer. It is used
// to monitor current levels. It currently only splits by project/domain, not workflow status.
levels *prometheus.GaugeVec

// The thing that we want to measure the current levels of
lister lister.FlyteWorkflowLister
}

func (r *ResourceLevelMonitor) countList(ctx context.Context, workflows []*v1alpha1.FlyteWorkflow) map[string]map[string]int {
// Map of Projects to Domains to counts
counts := map[string]map[string]int{}

// Collect all workflow metrics
for _, wf := range workflows {
execID := wf.GetExecutionID()
var project string
var domain string
if execID.WorkflowExecutionIdentifier == nil {
logger.Warningf(ctx, "Workflow does not have an execution identifier! [%v]", wf)
project = missing
domain = missing
} else {
project = wf.ExecutionID.Project
domain = wf.ExecutionID.Domain
}
if _, ok := counts[project]; !ok {
counts[project] = map[string]int{}
}
counts[project][domain]++
}

return counts
}

func (r *ResourceLevelMonitor) collect(ctx context.Context) {
// Emit gauges at both the project/domain level - aggregation to be handled by Prometheus
workflows, err := r.lister.List(labels.Everything())
if err != nil {
logger.Errorf(ctx, "Error listing workflows when attempting to collect data for gauges %s", err)
}

counts := r.countList(ctx, workflows)

// Emit labeled metrics, for each project/domain combination. This can be aggregated later with Prometheus queries.
metricKeys := []contextutils.Key{contextutils.ProjectKey, contextutils.DomainKey}
for project, val := range counts {
for domain, num := range val {
tempContext := contextutils.WithProjectDomain(ctx, project, domain)
gauge, err := r.levels.GetMetricWith(contextutils.Values(tempContext, metricKeys...))
if err != nil {
panic(err)
}
gauge.Set(float64(num))
}
}
}

func (r *ResourceLevelMonitor) RunCollector(ctx context.Context) {
ticker := time.NewTicker(resourceLevelMonitorCycleDuration)
collectorCtx := contextutils.WithGoroutineLabel(ctx, "resource-level-monitor")

go func() {
pprof.SetGoroutineLabels(collectorCtx)
for {
select {
case <-collectorCtx.Done():
return
case <-ticker.C:
t := r.CollectorTimer.Start()
r.collect(collectorCtx)
t.Stop()
}
}
}()
}

func NewResourceLevelMonitor(scope promutils.Scope, lister lister.FlyteWorkflowLister) *ResourceLevelMonitor {
return &ResourceLevelMonitor{
Scope: scope,
CollectorTimer: scope.MustNewStopWatch("collection_cycle", "Measures how long it takes to run a collection", time.Millisecond),
levels: scope.MustNewGaugeVec("flyteworkflow", "Current FlyteWorkflow levels",
contextutils.ProjectKey.String(), contextutils.DomainKey.String()),
lister: lister,
}
}

func newControllerMetrics(scope promutils.Scope) *metrics {
c := scope.MustNewCounterVec("wf_enqueue", "workflow enqueue count.", "type")
return &metrics{
Expand Down Expand Up @@ -298,6 +406,8 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store")
}

controller.levelMonitor = NewResourceLevelMonitor(scope.NewSubScope("collector"), flyteworkflowInformer.Lister())

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink,
launchPlanActor, launchPlanActor, cfg.MaxDatasetSizeBytes,
storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, scope)
Expand Down
83 changes: 83 additions & 0 deletions flytepropeller/pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package controller

import (
"context"
"strings"
"testing"
"time"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
listers "github.com/lyft/flytepropeller/pkg/client/listers/flyteworkflow/v1alpha1"
"github.com/lyft/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/labels"
)

var wfs = []*v1alpha1.FlyteWorkflow{
{
ExecutionID: v1alpha1.ExecutionID{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Project: "proj",
Domain: "dev",
Name: "name",
},
},
},
{
ExecutionID: v1alpha1.ExecutionID{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Project: "proj",
Domain: "dev",
Name: "name",
},
},
},
{
ExecutionID: v1alpha1.ExecutionID{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Project: "proj2",
Domain: "dev",
Name: "name",
},
},
},
}

func TestNewResourceLevelMonitor(t *testing.T) {
lm := ResourceLevelMonitor{}
res := lm.countList(context.Background(), wfs)
assert.Equal(t, 2, res["proj"]["dev"])
assert.Equal(t, 1, res["proj2"]["dev"])
}

type mockWFLister struct {
listers.FlyteWorkflowLister
}

func (m mockWFLister) List(_ labels.Selector) (ret []*v1alpha1.FlyteWorkflow, err error) {
return wfs, nil
}

func TestResourceLevelMonitor_collect(t *testing.T) {
scope := promutils.NewScope("testscope")
g := scope.MustNewGaugeVec("unittest", "testing", "project", "domain")
lm := &ResourceLevelMonitor{
Scope: scope,
CollectorTimer: scope.MustNewStopWatch("collection_cycle", "Measures how long it takes to run a collection", time.Millisecond),
levels: g,
lister: mockWFLister{},
}
lm.collect(context.Background())

var expected = `
# HELP testscope:unittest testing
# TYPE testscope:unittest gauge
testscope:unittest{domain="dev",project="proj"} 2
testscope:unittest{domain="dev",project="proj2"} 1
`

err := testutil.CollectAndCompare(g, strings.NewReader(expected))
assert.NoError(t, err)
}

0 comments on commit 2fc3405

Please sign in to comment.