Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Martin committed Apr 13, 2023
1 parent ece9f3c commit 5a7d6c5
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 34 deletions.
40 changes: 21 additions & 19 deletions internal/common/metrics/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,43 +202,44 @@ func Describe(out chan<- *prometheus.Desc) {
}
}

func CollectQueueMetrics(queueCounts map[string]int, metricsProvider QueueMetricProvider, metrics chan<- prometheus.Metric) {
func CollectQueueMetrics(queueCounts map[string]int, metricsProvider QueueMetricProvider) []prometheus.Metric {
metrics := make([]prometheus.Metric, 0, len(AllDescs))
for q, count := range queueCounts {
metrics <- NewQueueSizeMetric(count, q)
metrics = append(metrics, NewQueueSizeMetric(count, q))
queuedJobMetrics := metricsProvider.GetQueuedJobMetrics(q)
runningJobMetrics := metricsProvider.GetRunningJobMetrics(q)
for _, m := range queuedJobMetrics {
queueDurations := m.Durations
if queueDurations.GetCount() > 0 {
metrics <- NewQueueDuration(m.Durations.GetCount(), queueDurations.GetSum(), queueDurations.GetBuckets(), m.Pool, m.PriorityClass, q)
metrics <- NewMinQueueDuration(queueDurations.GetMin(), m.Pool, m.PriorityClass, q)
metrics <- NewMaxQueueDuration(queueDurations.GetMax(), m.Pool, m.PriorityClass, q)
metrics <- NewMedianQueueDuration(queueDurations.GetMedian(), m.Pool, m.PriorityClass, q)
metrics = append(metrics, NewQueueDuration(m.Durations.GetCount(), queueDurations.GetSum(), queueDurations.GetBuckets(), m.Pool, m.PriorityClass, q))
metrics = append(metrics, NewMinQueueDuration(queueDurations.GetMin(), m.Pool, m.PriorityClass, q))
metrics = append(metrics, NewMaxQueueDuration(queueDurations.GetMax(), m.Pool, m.PriorityClass, q))
metrics = append(metrics, NewMedianQueueDuration(queueDurations.GetMedian(), m.Pool, m.PriorityClass, q))
}

// Sort the keys so we get a predicatable output order
// Sort the keys so we get a predictable output order
resources := maps.Keys(m.Resources)
slices.Sort(resources)

for _, resourceType := range resources {
amount := m.Resources[resourceType]
if amount.GetCount() > 0 {
metrics <- NewQueueResources(amount.GetSum(), m.Pool, m.PriorityClass, q, resourceType)
metrics <- NewMinQueueResources(amount.GetMin(), m.Pool, m.PriorityClass, q, resourceType)
metrics <- NewMaxQueueResources(amount.GetMax(), m.Pool, m.PriorityClass, q, resourceType)
metrics <- NewMedianQueueResources(amount.GetMedian(), m.Pool, m.PriorityClass, q, resourceType)
metrics <- NewCountQueueResources(amount.GetCount(), m.Pool, m.PriorityClass, q, resourceType)
metrics = append(metrics, NewQueueResources(amount.GetSum(), m.Pool, m.PriorityClass, q, resourceType))
metrics = append(metrics, NewMinQueueResources(amount.GetMin(), m.Pool, m.PriorityClass, q, resourceType))
metrics = append(metrics, NewMaxQueueResources(amount.GetMax(), m.Pool, m.PriorityClass, q, resourceType))
metrics = append(metrics, NewMedianQueueResources(amount.GetMedian(), m.Pool, m.PriorityClass, q, resourceType))
metrics = append(metrics, NewCountQueueResources(amount.GetCount(), m.Pool, m.PriorityClass, q, resourceType))
}
}
}

for _, m := range runningJobMetrics {
runningJobDurations := m.Durations
if runningJobDurations.GetCount() > 0 {
metrics <- NewJobRunRunDuration(m.Durations.GetCount(), runningJobDurations.GetSum(), runningJobDurations.GetBuckets(), m.Pool, m.PriorityClass, q)
metrics <- NewMinJobRunDuration(runningJobDurations.GetMin(), m.Pool, m.PriorityClass, q)
metrics <- NewMaxJobRunDuration(runningJobDurations.GetMax(), m.Pool, m.PriorityClass, q)
metrics <- NewMedianJobRunDuration(runningJobDurations.GetMedian(), m.Pool, m.PriorityClass, q)
metrics = append(metrics, NewJobRunRunDuration(m.Durations.GetCount(), runningJobDurations.GetSum(), runningJobDurations.GetBuckets(), m.Pool, m.PriorityClass, q))
metrics = append(metrics, NewMinJobRunDuration(runningJobDurations.GetMin(), m.Pool, m.PriorityClass, q))
metrics = append(metrics, NewMaxJobRunDuration(runningJobDurations.GetMax(), m.Pool, m.PriorityClass, q))
metrics = append(metrics, NewMedianJobRunDuration(runningJobDurations.GetMedian(), m.Pool, m.PriorityClass, q))
}

// Sort the keys so we get a predicatable output order
Expand All @@ -248,13 +249,14 @@ func CollectQueueMetrics(queueCounts map[string]int, metricsProvider QueueMetric
for _, resourceType := range resources {
amount := m.Resources[resourceType]
if amount.GetCount() > 0 {
metrics <- NewMinQueueAllocated(amount.GetMin(), m.Pool, m.PriorityClass, q, resourceType)
metrics <- NewMaxQueueAllocated(amount.GetMax(), m.Pool, m.PriorityClass, q, resourceType)
metrics <- NewMedianQueueAllocated(amount.GetMedian(), m.Pool, m.PriorityClass, q, resourceType)
metrics = append(metrics, NewMinQueueAllocated(amount.GetMin(), m.Pool, m.PriorityClass, q, resourceType))
metrics = append(metrics, NewMaxQueueAllocated(amount.GetMax(), m.Pool, m.PriorityClass, q, resourceType))
metrics = append(metrics, NewMedianQueueAllocated(amount.GetMedian(), m.Pool, m.PriorityClass, q, resourceType))
}
}
}
}
return metrics
}

func NewQueueSizeMetric(value int, queue string) prometheus.Metric {
Expand Down
71 changes: 56 additions & 15 deletions internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package scheduler

import (
"context"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/google/uuid"
"sync/atomic"
"time"

Expand Down Expand Up @@ -60,31 +62,34 @@ func (m metricsState) numQueuedJobs() map[string]int {
// MetricsCollector is a Prometheus Collector that handles scheduler metrics.
// The metrics themselves are calculated asynchronously every refreshPeriod
type MetricsCollector struct {
jobDb *jobdb.JobDb
queueRepository database.QueueRepository
poolAssigner PoolAssigner
refreshPeriod time.Duration
clock clock.Clock
state atomic.Value
jobDb *jobdb.JobDb
queueRepository database.QueueRepository
executorRepository database.ExecutorRepository
poolAssigner PoolAssigner
refreshPeriod time.Duration
clock clock.Clock
state atomic.Value
}

func NewMetricsCollector(
jobDb *jobdb.JobDb,
queueRepository database.QueueRepository,
executorRepository database.ExecutorRepository,
poolAssigner PoolAssigner,
refreshPeriod time.Duration,
) *MetricsCollector {
return &MetricsCollector{
jobDb: jobDb,
queueRepository: queueRepository,
poolAssigner: poolAssigner,
refreshPeriod: refreshPeriod,
clock: clock.RealClock{},
state: atomic.Value{},
jobDb: jobDb,
queueRepository: queueRepository,
executorRepository: executorRepository,
poolAssigner: poolAssigner,
refreshPeriod: refreshPeriod,
clock: clock.RealClock{},
state: atomic.Value{},
}
}

// Run enters s a loop which updates the metrics every refreshPeriod until the supplied comtext is cancelled
// Run enters s a loop which updates the metrics every refreshPeriod until the supplied context is cancelled
func (c *MetricsCollector) Run(ctx context.Context) error {
ticker := c.clock.NewTicker(c.refreshPeriod)
log.Infof("Will update metrics every %s", c.refreshPeriod)
Expand All @@ -111,7 +116,7 @@ func (c *MetricsCollector) Describe(out chan<- *prometheus.Desc) {
func (c *MetricsCollector) Collect(metrics chan<- prometheus.Metric) {
state, ok := c.state.Load().(metricsState)
if ok {
commonmetrics.CollectQueueMetrics(state.numQueuedJobs(), state, metrics)

}
}

Expand All @@ -124,6 +129,11 @@ func (c *MetricsCollector) refresh(ctx context.Context) error {
return err
}

executors, err := c.executorRepository.GetExecutors(ctx)
if err != nil {
return err
}

err = c.poolAssigner.Refresh(ctx)
if err != nil {
return err
Expand All @@ -141,7 +151,8 @@ func (c *MetricsCollector) refresh(ctx context.Context) error {
}

currentTime := c.clock.Now()
for _, job := range c.jobDb.GetAll(c.jobDb.ReadTxn()) {
txn := c.jobDb.ReadTxn()
for _, job := range c.jobDb.GetAll(txn) {
// Don't calculate metrics for dead jobs
if job.InTerminalState() {
continue
Expand Down Expand Up @@ -180,6 +191,36 @@ func (c *MetricsCollector) refresh(ctx context.Context) error {
recorder.RecordJobRuntime(pool, priorityClass, timeInState)
recorder.RecordResources(pool, priorityClass, jobResources)
}

queueMetrics := commonmetrics.CollectQueueMetrics(ms.numQueuedJobs(), ms)

type phaseKey struct {
cluster string
pool string
queueName string
phase string
nodeType string
}
countsByKey := map[phaseKey]int{}
for _, executor := range executors {
for _, node := range executor.Nodes {
for runId, jobRunState := range node.StateByJobRunId {
job := c.jobDb.GetByRunId(txn, uuid.MustParse(runId))
if job != nil {
phase := schedulerobjects.JobRunState_name[int32(jobRunState)]
key := phaseKey{
cluster: executor.Id,
pool: executor.Pool,
queueName: job.Queue(),
phase: phase,
nodeType: "",
}
countsByKey[key]++
}
}
}
}

c.state.Store(ms)
log.Debugf("Refreshed prometheus metrics in %s", time.Since(start))
return nil
Expand Down
1 change: 1 addition & 0 deletions internal/scheduler/schedulerapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func Run(config Configuration) error {
metricsCollector := NewMetricsCollector(
scheduler.jobDb,
queueRepository,
nil,
poolAssigner,
config.Metrics.RefreshInterval)
prometheus.MustRegister(metricsCollector)
Expand Down

0 comments on commit 5a7d6c5

Please sign in to comment.