-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Add struct-wide RWMutex to metrics #3421
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,7 +37,7 @@ type metric struct { | |
|
||
type Metrics struct { | ||
// Ensures mutual exclusion in workflows map | ||
workflowsMutex sync.Mutex | ||
mutex sync.RWMutex | ||
metricsConfig ServerConfig | ||
telemetryConfig ServerConfig | ||
|
||
|
@@ -77,6 +77,9 @@ func New(metricsConfig, telemetryConfig ServerConfig) *Metrics { | |
} | ||
|
||
func (m *Metrics) allMetrics() []prometheus.Metric { | ||
m.mutex.RLock() | ||
defer m.mutex.RUnlock() | ||
|
||
allMetrics := []prometheus.Metric{ | ||
m.workflowsProcessed, | ||
m.operationDurations, | ||
|
@@ -98,8 +101,8 @@ func (m *Metrics) allMetrics() []prometheus.Metric { | |
} | ||
|
||
func (m *Metrics) WorkflowAdded(key string, phase v1alpha1.NodePhase) { | ||
m.workflowsMutex.Lock() | ||
defer m.workflowsMutex.Unlock() | ||
m.mutex.Lock() | ||
defer m.mutex.Unlock() | ||
|
||
if m.workflows[key] { | ||
return | ||
|
@@ -111,19 +114,23 @@ func (m *Metrics) WorkflowAdded(key string, phase v1alpha1.NodePhase) { | |
} | ||
|
||
func (m *Metrics) WorkflowUpdated(key string, fromPhase, toPhase v1alpha1.NodePhase) { | ||
m.workflowsMutex.Lock() | ||
hasKey := m.workflows[key] | ||
m.workflowsMutex.Unlock() | ||
if fromPhase == toPhase || !hasKey { | ||
m.mutex.Lock() | ||
defer m.mutex.Unlock() | ||
|
||
if fromPhase == toPhase || !m.workflows[key] { | ||
return | ||
} | ||
m.WorkflowDeleted(key, fromPhase) | ||
m.WorkflowAdded(key, toPhase) | ||
if _, ok := m.workflowsByPhase[fromPhase]; ok { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this copy-and-paste? maybe make private func which does not lock? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not quite copy-and-paste, there is some logic that would be redundant if we go with the private func approach; mainly, we will remove and readd the key from the map and perform an additional check unnecessarily. I actually think this is simpler and easier to understand. However, I don't feel strongly and would gladly go with the private func approach if you think it's necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should just make sure that if we need to make changes in two places - we are aware of it |
||
m.workflowsByPhase[fromPhase].Dec() | ||
} | ||
if _, ok := m.workflowsByPhase[toPhase]; ok { | ||
m.workflowsByPhase[toPhase].Inc() | ||
} | ||
} | ||
|
||
func (m *Metrics) WorkflowDeleted(key string, phase v1alpha1.NodePhase) { | ||
m.workflowsMutex.Lock() | ||
defer m.workflowsMutex.Unlock() | ||
m.mutex.Lock() | ||
defer m.mutex.Unlock() | ||
|
||
if !m.workflows[key] { | ||
return | ||
|
@@ -139,11 +146,17 @@ func (m *Metrics) OperationCompleted(durationSeconds float64) { | |
} | ||
|
||
func (m *Metrics) GetCustomMetric(key string) prometheus.Metric { | ||
m.mutex.RLock() | ||
defer m.mutex.RUnlock() | ||
|
||
// It's okay to return nil metrics in this function | ||
return m.customMetrics[key].metric | ||
} | ||
|
||
func (m *Metrics) UpsertCustomMetric(key string, newMetric prometheus.Metric) error { | ||
m.mutex.Lock() | ||
defer m.mutex.Unlock() | ||
|
||
if _, inUse := m.defaultMetricDescs[newMetric.Desc().String()]; inUse { | ||
return fmt.Errorf("metric '%s' is already in use by the system, please use a different name", newMetric.Desc()) | ||
} | ||
|
@@ -159,17 +172,26 @@ const ( | |
) | ||
|
||
func (m *Metrics) OperationPanic() { | ||
m.mutex.Lock() | ||
defer m.mutex.Unlock() | ||
|
||
m.errors[ErrorCauseOperationPanic].Inc() | ||
} | ||
|
||
func (m *Metrics) CronWorkflowSubmissionError() { | ||
m.mutex.Lock() | ||
defer m.mutex.Unlock() | ||
|
||
m.errors[ErrorCauseCronWorkflowSubmissionError].Inc() | ||
} | ||
|
||
// Act as a metrics provider for a workflow queue | ||
var _ workqueue.MetricsProvider = &Metrics{} | ||
|
||
func (m *Metrics) NewDepthMetric(name string) workqueue.GaugeMetric { | ||
m.mutex.Lock() | ||
defer m.mutex.Unlock() | ||
|
||
key := fmt.Sprintf("%s-depth", name) | ||
if _, ok := m.workqueueMetrics[key]; !ok { | ||
m.workqueueMetrics[key] = newGauge("queue_depth_count", "Depth of the queue", map[string]string{"queue_name": name}) | ||
|
@@ -178,6 +200,9 @@ func (m *Metrics) NewDepthMetric(name string) workqueue.GaugeMetric { | |
} | ||
|
||
func (m *Metrics) NewAddsMetric(name string) workqueue.CounterMetric { | ||
m.mutex.Lock() | ||
defer m.mutex.Unlock() | ||
|
||
key := fmt.Sprintf("%s-adds", name) | ||
if _, ok := m.workqueueMetrics[key]; !ok { | ||
m.workqueueMetrics[key] = newCounter("queue_adds_count", "Adds to the queue", map[string]string{"queue_name": name}) | ||
|
@@ -186,6 +211,9 @@ func (m *Metrics) NewAddsMetric(name string) workqueue.CounterMetric { | |
} | ||
|
||
func (m *Metrics) NewLatencyMetric(name string) workqueue.HistogramMetric { | ||
m.mutex.Lock() | ||
defer m.mutex.Unlock() | ||
|
||
key := fmt.Sprintf("%s-latency", name) | ||
if _, ok := m.workqueueMetrics[key]; !ok { | ||
m.workqueueMetrics[key] = newHistogram("queue_latency", "Time objects spend waiting in the queue", map[string]string{"queue_name": name}, []float64{1.0, 5.0, 20.0, 60.0, 180.0}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will it handle concurrent read and write on map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a synchronized map in golang.
https://golang.org/pkg/sync/#Map