Skip to content

Commit

Permalink
The producer can be easily blocked due to a race condition in Broker.…
Browse files Browse the repository at this point in the history
…throttleTimer, which may result in a panic. Consider changing the type of Broker.throttleTimer to atomic.Value

Fixes IBM#2823

Signed-off-by: chengsha  <shacheng@tencent.com>
  • Loading branch information
shacheng committed Mar 5, 2024
1 parent fd84c2b commit 45e18cf
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions broker.go
Expand Up @@ -59,7 +59,7 @@ type Broker struct {
kerberosAuthenticator GSSAPIKerberosAuth
clientSessionReauthenticationTimeMs int64

throttleTimer *time.Timer
throttleTimer atomic.Value
}

// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
Expand All @@ -86,6 +86,10 @@ const (
SASLExtKeyAuth = "auth"
)

var (
emptyTimer = time.NewTimer(time.Duration(0))
)

// AccessToken contains an access token used to authenticate a
// SASL/OAUTHBEARER client along with associated metadata.
type AccessToken struct {
Expand Down Expand Up @@ -1697,20 +1701,24 @@ func (b *Broker) handleThrottledResponse(resp protocolBody) {
}

func (b *Broker) setThrottle(throttleTime time.Duration) {
if b.throttleTimer != nil {
value := b.throttleTimer.Load()
if value != nil && value != emptyTimer {
throttleTimer := value.(*time.Timer)
// if there is an existing timer stop/clear it
if !b.throttleTimer.Stop() {
<-b.throttleTimer.C
if !throttleTimer.Stop() {
<-throttleTimer.C
}
}
b.throttleTimer = time.NewTimer(throttleTime)
b.throttleTimer.CompareAndSwap(value, time.NewTimer(throttleTime))
}

func (b *Broker) waitIfThrottled() {
if b.throttleTimer != nil {
value := b.throttleTimer.Load()
if value != nil && value != emptyTimer {
throttleTimer := value.(*time.Timer)
b.throttleTimer.CompareAndSwap(throttleTimer, emptyTimer)
DebugLogger.Printf("broker/%d waiting for throttle timer\n", b.ID())
<-b.throttleTimer.C
b.throttleTimer = nil
<-throttleTimer.C
}
}

Expand Down

0 comments on commit 45e18cf

Please sign in to comment.