Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions concurrency/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ const (
// high. If response times exceed this threshold, it could signal that the system or the external service
// is under heavy load and may benefit from scaling down concurrency to alleviate pressure.
ResponseTimeCriticalThreshold = 2 * time.Second

debounceScaleDownThreshold = 5 // Number of consecutive triggers before scaling down

//
AcceptableAverageResponseTime = 100 * time.Millisecond
)
45 changes: 27 additions & 18 deletions concurrency/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math"
"net/http"
"strconv"
"sync"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -194,39 +195,47 @@ func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) in

// A slice to hold the last n response times for averaging
var responseTimes []time.Duration
var responseTimesLock sync.Mutex

// MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment.
// MonitorResponseTimeVariability monitors the response time variability and suggests a concurrency adjustment.
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) int {
ch.Metrics.Lock.Lock() // Ensure thread safety when accessing shared metrics
defer ch.Metrics.Lock.Unlock()
ch.Metrics.ResponseTimeVariability.Lock.Lock()
defer ch.Metrics.ResponseTimeVariability.Lock.Unlock()

// Append the latest response time
responseTimesLock.Lock() // Ensure safe concurrent access
responseTimes = append(responseTimes, responseTime)
if len(responseTimes) > 10 { // Use the last 10 measurements for a smoother average
responseTimes = responseTimes[1:]
if len(responseTimes) > 10 {
responseTimes = responseTimes[1:] // Maintain last 10 measurements
}
responseTimesLock.Unlock()

stdDev := calculateStdDev(responseTimes)
averageResponseTime := calculateAverage(responseTimes)

// Action determination with debounce effect
// Debounce mechanism for scaling down
const debounceCount = 3 // Threshold must be exceeded in 3 consecutive checks to act
if stdDev > (ch.Metrics.ResponseTimeVariability.StdDevThreshold * 2) {
// Multi-factor check before scaling down
if stdDev > ch.Metrics.ResponseTimeVariability.StdDevThreshold && averageResponseTime > AcceptableAverageResponseTime {
ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount++
ch.logger.Info("Increased debounce counter", zap.Int("counter", ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount))
if ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount >= debounceCount {
ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount = 0 // reset counter after action
ch.logger.Info("Concurrent requests scaling down due to high response time variability")
if ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount >= debounceScaleDownThreshold {
ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount = 0
return -1 // Suggest decrease concurrency
}
} else {
ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount = 0 // reset counter if condition not met
if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold && len(ch.sem) < cap(ch.sem) {
ch.logger.Info("Concurrent requests scaling up as conditions are favorable")
return 1 // Suggest increase concurrency if there is capacity
ch.Metrics.ResponseTimeVariability.DebounceScaleDownCount = 0 // Reset counter if conditions are not met
if stdDev <= ch.Metrics.ResponseTimeVariability.StdDevThreshold {
return 1 // Suggest increase concurrency if conditions are favorable
}
}
return 0
return 0 // Default to no change
}

// calculateAverage computes the average response time from a slice of response times.
func calculateAverage(times []time.Duration) time.Duration {
var total time.Duration
for _, t := range times {
total += t
}
return total / time.Duration(len(times))
}

// calculateStdDev computes the standard deviation of response times.
Expand Down