Skip to content

Commit

Permalink
Add a gauge metric for current experiments (kubeflow#954)
Browse files Browse the repository at this point in the history
* add a gauge metric for current experiments

Signed-off-by: yeya24 <yb532204897@gmail.com>

* fmt & fix test

Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 authored and k8s-ci-robot committed Dec 9, 2019
1 parent 5d46799 commit cb7d3e7
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 44 deletions.
5 changes: 4 additions & 1 deletion pkg/controller.v1alpha3/experiment/experiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {

r.Generator = manifest.New(r.Client)
r.updateStatusHandler = r.updateStatus
r.collector = util.NewExpsCollector(mgr.GetCache())
return r
}

Expand Down Expand Up @@ -159,6 +160,8 @@ type ReconcileExperiment struct {
manifest.Generator
// updateStatusHandler is defined for test purpose.
updateStatusHandler updateStatusFunc
// collector is a wrapper for experiment metrics.
collector *util.ExperimentsCollector
}

// Reconcile reads that state of the cluster for a Experiment object and makes changes based on the state read
Expand Down Expand Up @@ -235,7 +238,7 @@ func (r *ReconcileExperiment) ReconcileExperiment(instance *experimentsv1alpha3.
return err
}
if len(trials.Items) > 0 {
if err := util.UpdateExperimentStatus(instance, trials); err != nil {
if err := util.UpdateExperimentStatus(r.collector, instance, trials); err != nil {
logger.Error(err, "Update experiment status error")
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/kubeflow/katib/pkg/controller.v1alpha3/experiment/util"
"github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -169,6 +170,7 @@ spec:
scheme: mgr.GetScheme(),
Suggestion: suggestion,
Generator: generator,
collector: util.NewExpsCollector(mgr.GetCache()),
}
r.updateStatusHandler = func(instance *experimentsv1alpha3.Experiment) error {
if !instance.IsCreated() {
Expand Down
5 changes: 2 additions & 3 deletions pkg/controller.v1alpha3/experiment/experiment_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3"
suggestionsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1alpha3"
trialsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3"
utilv1alpha3 "github.com/kubeflow/katib/pkg/controller.v1alpha3/experiment/util"
"github.com/kubeflow/katib/pkg/controller.v1alpha3/util"
)

Expand Down Expand Up @@ -102,9 +101,9 @@ func (r *ReconcileExperiment) updateFinalizers(instance *experimentsv1alpha3.Exp
return reconcile.Result{}, err
} else {
if !instance.ObjectMeta.DeletionTimestamp.IsZero() {
utilv1alpha3.IncreaseExperimentsDeletedCount()
r.collector.IncreaseExperimentsDeletedCount(instance.Namespace)
} else {
utilv1alpha3.IncreaseExperimentsCreatedCount()
r.collector.IncreaseExperimentsCreatedCount(instance.Namespace)
}
// Need to requeue because finalizer update does not change metadata.generation
return reconcile.Result{Requeue: true}, err
Expand Down
137 changes: 104 additions & 33 deletions pkg/controller.v1alpha3/experiment/util/prometheus_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,121 @@ limitations under the License.
package util

import (
"github.com/prometheus/client_golang/prometheus"
"context"

"github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3"
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

var (
experimentsDeletedCount = prometheus.NewCounter(prometheus.CounterOpts{
Name: "katib_experiment_deleted_total",
Help: "Counts number of Experiment deleted",
})
experimentsCreatedCount = prometheus.NewCounter(prometheus.CounterOpts{
Name: "katib_experiment_created_total",
Help: "Counts number of Experiment created",
})
experimentsSucceededCount = prometheus.NewCounter(prometheus.CounterOpts{
Name: "katib_experiment_succeeded_total",
Help: "Counts number of Experiment succeeded",
})
experimentsFailedCount = prometheus.NewCounter(prometheus.CounterOpts{
Name: "katib_experiment_failed_total",
Help: "Counts number of Experiment failed",
})
)
type ExperimentsCollector struct {
store cache.Cache
expDeleteCount *prometheus.CounterVec
expCreateCount *prometheus.CounterVec
expSucceedCount *prometheus.CounterVec
expFailCount *prometheus.CounterVec
expCurrent *prometheus.GaugeVec
}

func NewExpsCollector(store cache.Cache) *ExperimentsCollector {
c := &ExperimentsCollector{
store: store,
expDeleteCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "katib_experiment_deleted_total",
Help: "The total number of deleted experiments",
}, []string{"namespace"}),

func init() {
metrics.Registry.MustRegister(
experimentsDeletedCount,
experimentsCreatedCount,
experimentsSucceededCount,
experimentsFailedCount)
expCreateCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "katib_experiment_created_total",
Help: "The total number of created experiments",
}, []string{"namespace"}),

expSucceedCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "katib_experiment_succeeded_total",
Help: "The total numberr of succeeded experiments",
}, []string{"namespace"}),

expFailCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "katib_experiment_failed_total",
Help: "The total number of failed experiments",
}, []string{"namespace"}),

expCurrent: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "katib_experiments_current",
Help: "The number of current katib experiments in the cluster",
}, []string{"namespace", "status"}),
}
metrics.Registry.MustRegister(c)
return c
}

func IncreaseExperimentsDeletedCount() {
experimentsDeletedCount.Inc()
// Describe implements the prometheus.Collector interface.
func (m *ExperimentsCollector) Describe(ch chan<- *prometheus.Desc) {
m.expDeleteCount.Describe(ch)
m.expSucceedCount.Describe(ch)
m.expFailCount.Describe(ch)
m.expCreateCount.Describe(ch)
m.expCurrent.Describe(ch)
}

func IncreaseExperimentsCreatedCount() {
experimentsCreatedCount.Inc()
// Collect implements the prometheus.Collector interface.
func (m *ExperimentsCollector) Collect(ch chan<- prometheus.Metric) {
m.collect()
m.expDeleteCount.Collect(ch)
m.expSucceedCount.Collect(ch)
m.expFailCount.Collect(ch)
m.expCreateCount.Collect(ch)
m.expCurrent.Collect(ch)
}

func IncreaseExperimentsSucceededCount() {
experimentsSucceededCount.Inc()
func (c *ExperimentsCollector) IncreaseExperimentsDeletedCount(ns string) {
c.expDeleteCount.WithLabelValues(ns).Inc()
}

func IncreaseExperimentsFailedCount() {
experimentsFailedCount.Inc()
func (c *ExperimentsCollector) IncreaseExperimentsCreatedCount(ns string) {
c.expCreateCount.WithLabelValues(ns).Inc()
}

func (c *ExperimentsCollector) IncreaseExperimentsSucceededCount(ns string) {
c.expSucceedCount.WithLabelValues(ns).Inc()
}

func (c *ExperimentsCollector) IncreaseExperimentsFailedCount(ns string) {
c.expFailCount.WithLabelValues(ns).Inc()
}

// collect gets the current experiments from cache.
func (c *ExperimentsCollector) collect() {
var (
conditionType v1alpha3.ExperimentConditionType
status string
err error
)
expLists := &v1alpha3.ExperimentList{}
if err = c.store.List(context.TODO(), nil, expLists); err != nil {
return
}

expCache := map[string]map[string]int{}
for _, exp := range expLists.Items {
conditionType, err = exp.GetLastConditionType()
status = string(conditionType)
// If the experiment doesn't have any condition, use unknown.
if err != nil {
status = "Unknown"
}

if _, ok := expCache[exp.Namespace]; !ok {
expCache[exp.Namespace] = make(map[string]int)
}
expCache[exp.Namespace][status] += 1
}

c.expCurrent.Reset()
for ns, v := range expCache {
for status, count := range v {
c.expCurrent.WithLabelValues(ns, status).Set(float64(count))
}
}
}
14 changes: 7 additions & 7 deletions pkg/controller.v1alpha3/experiment/util/status_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ const (
ExperimentKilledReason = "ExperimentKilled"
)

func UpdateExperimentStatus(instance *experimentsv1alpha3.Experiment, trials *trialsv1alpha3.TrialList) error {
func UpdateExperimentStatus(collector *ExperimentsCollector, instance *experimentsv1alpha3.Experiment, trials *trialsv1alpha3.TrialList) error {

isObjectiveGoalReached := updateTrialsSummary(instance, trials)

if !instance.IsCompleted() {
UpdateExperimentStatusCondition(instance, isObjectiveGoalReached, false)
UpdateExperimentStatusCondition(collector, instance, isObjectiveGoalReached, false)
}
return nil

Expand Down Expand Up @@ -135,7 +135,7 @@ func getObjectiveMetricValue(trial trialsv1alpha3.Trial, objectiveMetricName str
return nil
}

func UpdateExperimentStatusCondition(instance *experimentsv1alpha3.Experiment, isObjectiveGoalReached bool, getSuggestionDone bool) {
func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance *experimentsv1alpha3.Experiment, isObjectiveGoalReached bool, getSuggestionDone bool) {

completedTrialsCount := instance.Status.TrialsSucceeded + instance.Status.TrialsFailed + instance.Status.TrialsKilled
failedTrialsCount := instance.Status.TrialsFailed
Expand All @@ -145,31 +145,31 @@ func UpdateExperimentStatusCondition(instance *experimentsv1alpha3.Experiment, i
msg := "Experiment has succeeded because Objective goal has reached"
instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg)
instance.Status.CompletionTime = &now
IncreaseExperimentsSucceededCount()
collector.IncreaseExperimentsSucceededCount(instance.Namespace)
return
}

if (instance.Spec.MaxTrialCount != nil) && (completedTrialsCount >= *instance.Spec.MaxTrialCount) {
msg := "Experiment has succeeded because max trial count has reached"
instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg)
instance.Status.CompletionTime = &now
IncreaseExperimentsSucceededCount()
collector.IncreaseExperimentsSucceededCount(instance.Namespace)
return
}

if getSuggestionDone && (instance.Status.TrialsPending+instance.Status.TrialsRunning) == 0 {
msg := "Experiment has succeeded because suggestion service has reached the end"
instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg)
instance.Status.CompletionTime = &now
IncreaseExperimentsSucceededCount()
collector.IncreaseExperimentsSucceededCount(instance.Namespace)
return
}

if (instance.Spec.MaxFailedTrialCount != nil) && (failedTrialsCount >= *instance.Spec.MaxFailedTrialCount) {
msg := "Experiment has failed because max failed count has reached"
instance.MarkExperimentStatusFailed(ExperimentFailedReason, msg)
instance.Status.CompletionTime = &now
IncreaseExperimentsFailedCount()
collector.IncreaseExperimentsFailedCount(instance.Namespace)
return
}

Expand Down

0 comments on commit cb7d3e7

Please sign in to comment.