diff --git a/broker.go b/broker.go index 268696cf4..d9542a4f0 100644 --- a/broker.go +++ b/broker.go @@ -59,7 +59,7 @@ type Broker struct { kerberosAuthenticator GSSAPIKerberosAuth clientSessionReauthenticationTimeMs int64 - throttleTimer *time.Timer + throttleTimer atomic.Value } // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker @@ -86,6 +86,10 @@ const ( SASLExtKeyAuth = "auth" ) +var ( + emptyTimer = time.NewTimer(time.Duration(0)) +) + // AccessToken contains an access token used to authenticate a // SASL/OAUTHBEARER client along with associated metadata. type AccessToken struct { @@ -1697,20 +1701,24 @@ func (b *Broker) handleThrottledResponse(resp protocolBody) { } func (b *Broker) setThrottle(throttleTime time.Duration) { - if b.throttleTimer != nil { + value := b.throttleTimer.Load() + if value != nil && value != emptyTimer { + throttleTimer := value.(*time.Timer) // if there is an existing timer stop/clear it - if !b.throttleTimer.Stop() { - <-b.throttleTimer.C + if !throttleTimer.Stop() { + <-throttleTimer.C } } - b.throttleTimer = time.NewTimer(throttleTime) + b.throttleTimer.CompareAndSwap(value, time.NewTimer(throttleTime)) } func (b *Broker) waitIfThrottled() { - if b.throttleTimer != nil { + value := b.throttleTimer.Load() + if value != nil && value != emptyTimer { + throttleTimer := value.(*time.Timer) + b.throttleTimer.CompareAndSwap(throttleTimer, emptyTimer) DebugLogger.Printf("broker/%d waiting for throttle timer\n", b.ID()) - <-b.throttleTimer.C - b.throttleTimer = nil + <-throttleTimer.C } }