diff --git a/broker.go b/broker.go index 268696cf4..d0d5b87b8 100644 --- a/broker.go +++ b/broker.go @@ -59,7 +59,8 @@ type Broker struct { kerberosAuthenticator GSSAPIKerberosAuth clientSessionReauthenticationTimeMs int64 - throttleTimer *time.Timer + throttleTimer *time.Timer + throttleTimerLock sync.Mutex } // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker @@ -1697,6 +1698,8 @@ func (b *Broker) handleThrottledResponse(resp protocolBody) { } func (b *Broker) setThrottle(throttleTime time.Duration) { + b.throttleTimerLock.Lock() + defer b.throttleTimerLock.Unlock() if b.throttleTimer != nil { // if there is an existing timer stop/clear it if !b.throttleTimer.Stop() { @@ -1707,6 +1710,8 @@ func (b *Broker) setThrottle(throttleTime time.Duration) { } func (b *Broker) waitIfThrottled() { + b.throttleTimerLock.Lock() + defer b.throttleTimerLock.Unlock() if b.throttleTimer != nil { DebugLogger.Printf("broker/%d waiting for throttle timer\n", b.ID()) <-b.throttleTimer.C