Skip to content

Commit

Permalink
fix: Add struct-wide RWMutext to metrics (#3421)
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 committed Jul 8, 2020
1 parent 0463f24 commit 733e95f
Showing 1 changed file with 39 additions and 11 deletions.
50 changes: 39 additions & 11 deletions workflow/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type metric struct {

type Metrics struct {
// Ensures mutual exclusion in workflows map
workflowsMutex sync.Mutex
mutex sync.RWMutex
metricsConfig ServerConfig
telemetryConfig ServerConfig

Expand Down Expand Up @@ -77,6 +77,9 @@ func New(metricsConfig, telemetryConfig ServerConfig) *Metrics {
}

func (m *Metrics) allMetrics() []prometheus.Metric {
m.mutex.RLock()
defer m.mutex.RUnlock()

allMetrics := []prometheus.Metric{
m.workflowsProcessed,
m.operationDurations,
Expand All @@ -98,8 +101,8 @@ func (m *Metrics) allMetrics() []prometheus.Metric {
}

func (m *Metrics) WorkflowAdded(key string, phase v1alpha1.NodePhase) {
m.workflowsMutex.Lock()
defer m.workflowsMutex.Unlock()
m.mutex.Lock()
defer m.mutex.Unlock()

if m.workflows[key] {
return
Expand All @@ -111,19 +114,23 @@ func (m *Metrics) WorkflowAdded(key string, phase v1alpha1.NodePhase) {
}

func (m *Metrics) WorkflowUpdated(key string, fromPhase, toPhase v1alpha1.NodePhase) {
m.workflowsMutex.Lock()
hasKey := m.workflows[key]
m.workflowsMutex.Unlock()
if fromPhase == toPhase || !hasKey {
m.mutex.Lock()
defer m.mutex.Unlock()

if fromPhase == toPhase || !m.workflows[key] {
return
}
m.WorkflowDeleted(key, fromPhase)
m.WorkflowAdded(key, toPhase)
if _, ok := m.workflowsByPhase[fromPhase]; ok {
m.workflowsByPhase[fromPhase].Dec()
}
if _, ok := m.workflowsByPhase[toPhase]; ok {
m.workflowsByPhase[toPhase].Inc()
}
}

func (m *Metrics) WorkflowDeleted(key string, phase v1alpha1.NodePhase) {
m.workflowsMutex.Lock()
defer m.workflowsMutex.Unlock()
m.mutex.Lock()
defer m.mutex.Unlock()

if !m.workflows[key] {
return
Expand All @@ -139,11 +146,17 @@ func (m *Metrics) OperationCompleted(durationSeconds float64) {
}

func (m *Metrics) GetCustomMetric(key string) prometheus.Metric {
m.mutex.RLock()
defer m.mutex.RUnlock()

// It's okay to return nil metrics in this function
return m.customMetrics[key].metric
}

func (m *Metrics) UpsertCustomMetric(key string, newMetric prometheus.Metric) error {
m.mutex.Lock()
defer m.mutex.Unlock()

if _, inUse := m.defaultMetricDescs[newMetric.Desc().String()]; inUse {
return fmt.Errorf("metric '%s' is already in use by the system, please use a different name", newMetric.Desc())
}
Expand All @@ -159,17 +172,26 @@ const (
)

func (m *Metrics) OperationPanic() {
m.mutex.Lock()
defer m.mutex.Unlock()

m.errors[ErrorCauseOperationPanic].Inc()
}

func (m *Metrics) CronWorkflowSubmissionError() {
m.mutex.Lock()
defer m.mutex.Unlock()

m.errors[ErrorCauseCronWorkflowSubmissionError].Inc()
}

// Act as a metrics provider for a workflow queue
var _ workqueue.MetricsProvider = &Metrics{}

func (m *Metrics) NewDepthMetric(name string) workqueue.GaugeMetric {
m.mutex.Lock()
defer m.mutex.Unlock()

key := fmt.Sprintf("%s-depth", name)
if _, ok := m.workqueueMetrics[key]; !ok {
m.workqueueMetrics[key] = newGauge("queue_depth_count", "Depth of the queue", map[string]string{"queue_name": name})
Expand All @@ -178,6 +200,9 @@ func (m *Metrics) NewDepthMetric(name string) workqueue.GaugeMetric {
}

func (m *Metrics) NewAddsMetric(name string) workqueue.CounterMetric {
m.mutex.Lock()
defer m.mutex.Unlock()

key := fmt.Sprintf("%s-adds", name)
if _, ok := m.workqueueMetrics[key]; !ok {
m.workqueueMetrics[key] = newCounter("queue_adds_count", "Adds to the queue", map[string]string{"queue_name": name})
Expand All @@ -186,6 +211,9 @@ func (m *Metrics) NewAddsMetric(name string) workqueue.CounterMetric {
}

func (m *Metrics) NewLatencyMetric(name string) workqueue.HistogramMetric {
m.mutex.Lock()
defer m.mutex.Unlock()

key := fmt.Sprintf("%s-latency", name)
if _, ok := m.workqueueMetrics[key]; !ok {
m.workqueueMetrics[key] = newHistogram("queue_latency", "Time objects spend waiting in the queue", map[string]string{"queue_name": name}, []float64{1.0, 5.0, 20.0, 60.0, 180.0})
Expand Down

0 comments on commit 733e95f

Please sign in to comment.